From d38fc909e3e99341eb83d8d7fdda7a7bd1a8fa32 Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Thu, 5 Sep 2024 17:55:59 -0700 Subject: [PATCH] Support ReadableStream in request.clone & response.clone() (#13744) --- src/bun.js/bindings/BunObject.cpp | 9 +- src/bun.js/bindings/ZigGlobalObject.cpp | 47 +- .../bindings/webcore/StructuredClone.cpp | 4 + src/bun.js/webcore/body.zig | 76 ++- src/bun.js/webcore/request.zig | 22 +- src/bun.js/webcore/response.zig | 27 +- src/bun.js/webcore/streams.zig | 25 + src/codegen/bundle-functions.ts | 36 +- src/js/builtins/ReadableStream.ts | 151 ++++- src/js/builtins/ReadableStreamInternals.ts | 137 ++-- test/js/web/fetch/body-clone.test.ts | 520 +++++++++++++++ test/js/web/fetch/body-stream.test.ts | 626 +++++++++--------- test/js/web/fetch/stream-fast-path.test.ts | 11 +- 13 files changed, 1312 insertions(+), 379 deletions(-) create mode 100644 test/js/web/fetch/body-clone.test.ts diff --git a/src/bun.js/bindings/BunObject.cpp b/src/bun.js/bindings/BunObject.cpp index caed3789fbc73..0b489400d475c 100644 --- a/src/bun.js/bindings/BunObject.cpp +++ b/src/bun.js/bindings/BunObject.cpp @@ -85,7 +85,7 @@ static inline JSC::EncodedJSValue flattenArrayOfBuffersIntoArrayBufferOrUint8Arr } size_t arrayLength = array->length(); - if (arrayLength < 1) { + const auto returnEmptyArrayBufferView = [&]() -> EncodedJSValue { if (asUint8Array) { return JSValue::encode( JSC::JSUint8Array::create( @@ -95,6 +95,10 @@ static inline JSC::EncodedJSValue flattenArrayOfBuffersIntoArrayBufferOrUint8Arr } RELEASE_AND_RETURN(throwScope, JSValue::encode(JSC::JSArrayBuffer::create(vm, lexicalGlobalObject->arrayBufferStructure(), JSC::ArrayBuffer::create(static_cast(0), 1)))); + }; + + if (arrayLength < 1) { + return returnEmptyArrayBufferView(); } size_t byteLength = 0; @@ -149,7 +153,7 @@ static inline JSC::EncodedJSValue flattenArrayOfBuffersIntoArrayBufferOrUint8Arr byteLength = std::min(byteLength, maxLength); if (byteLength == 0) { - RELEASE_AND_RETURN(throwScope, JSValue::encode(JSC::JSArrayBuffer::create(vm, lexicalGlobalObject->arrayBufferStructure(), JSC::ArrayBuffer::create(static_cast(0), 1)))); + return returnEmptyArrayBufferView(); } auto buffer = JSC::ArrayBuffer::tryCreateUninitialized(byteLength, 1); @@ -237,6 +241,7 @@ JSC_DEFINE_HOST_FUNCTION(functionConcatTypedArrays, (JSGlobalObject * globalObje auto arg2 = callFrame->argument(2); if (!arg2.isUndefined()) { asUint8Array = arg2.toBoolean(globalObject); + RETURN_IF_EXCEPTION(throwScope, {}); } return flattenArrayOfBuffersIntoArrayBufferOrUint8Array(globalObject, arrayValue, maxLength, asUint8Array); diff --git a/src/bun.js/bindings/ZigGlobalObject.cpp b/src/bun.js/bindings/ZigGlobalObject.cpp index 99f91b91565a3..02f5ec4f6c2fe 100644 --- a/src/bun.js/bindings/ZigGlobalObject.cpp +++ b/src/bun.js/bindings/ZigGlobalObject.cpp @@ -1991,8 +1991,53 @@ JSC_DEFINE_HOST_FUNCTION(isAbortSignal, (JSGlobalObject*, CallFrame* callFrame)) ASSERT(callFrame->argumentCount() == 1); return JSValue::encode(jsBoolean(callFrame->uncheckedArgument(0).inherits())); } +static inline std::optional invokeReadableStreamFunction(JSC::JSGlobalObject& lexicalGlobalObject, const JSC::Identifier& identifier, JSC::JSValue thisValue, const JSC::MarkedArgumentBuffer& arguments) +{ + JSC::VM& vm = lexicalGlobalObject.vm(); + JSC::JSLockHolder lock(vm); + + auto function = lexicalGlobalObject.get(&lexicalGlobalObject, identifier); + ASSERT(function.isCallable()); + + auto scope = DECLARE_CATCH_SCOPE(vm); + auto callData = JSC::getCallData(function); + auto result = call(&lexicalGlobalObject, function, callData, thisValue, arguments); +#if BUN_DEBUG + if (scope.exception()) { + Bun__reportError(&lexicalGlobalObject, JSValue::encode(scope.exception())); + } +#endif + EXCEPTION_ASSERT(!scope.exception() || vm.hasPendingTerminationException()); + if (scope.exception()) + return {}; + return result; +} +extern "C" bool ReadableStream__tee(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject, JSC__JSValue* possibleReadableStream1, JSC__JSValue* possibleReadableStream2) +{ + auto* readableStream = jsDynamicCast(JSC::JSValue::decode(possibleReadableStream)); + if (UNLIKELY(!readableStream)) + return false; + + auto& lexicalGlobalObject = *globalObject; + auto* clientData = static_cast(lexicalGlobalObject.vm().clientData); + auto& privateName = clientData->builtinFunctions().readableStreamInternalsBuiltins().readableStreamTeePrivateName(); + + MarkedArgumentBuffer arguments; + arguments.append(readableStream); + arguments.append(JSC::jsBoolean(true)); + ASSERT(!arguments.hasOverflowed()); + auto returnedValue = invokeReadableStreamFunction(lexicalGlobalObject, privateName, JSC::jsUndefined(), arguments); + if (!returnedValue) + return false; + + auto results = Detail::SequenceConverter::convert(lexicalGlobalObject, *returnedValue); + + ASSERT(results.size() == 2); + *possibleReadableStream1 = JSValue::encode(results[0]); + *possibleReadableStream2 = JSValue::encode(results[1]); + return true; +} -extern "C" void ReadableStream__cancel(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject); extern "C" void ReadableStream__cancel(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject) { auto* readableStream = jsDynamicCast(JSC::JSValue::decode(possibleReadableStream)); diff --git a/src/bun.js/bindings/webcore/StructuredClone.cpp b/src/bun.js/bindings/webcore/StructuredClone.cpp index 088d9aa395ab8..f0f8691bd23ba 100644 --- a/src/bun.js/bindings/webcore/StructuredClone.cpp +++ b/src/bun.js/bindings/webcore/StructuredClone.cpp @@ -77,6 +77,10 @@ JSC_DEFINE_HOST_FUNCTION(structuredCloneForStream, (JSGlobalObject * globalObjec JSValue value = callFrame->uncheckedArgument(0); + if (value.isPrimitive()) { + return JSValue::encode(value); + } + if (value.inherits()) RELEASE_AND_RETURN(scope, cloneArrayBufferImpl(globalObject, callFrame, CloneMode::Full)); diff --git a/src/bun.js/webcore/body.zig b/src/bun.js/webcore/body.zig index c1cd64639f4d1..e5bc299c2383d 100644 --- a/src/bun.js/webcore/body.zig +++ b/src/bun.js/webcore/body.zig @@ -482,7 +482,7 @@ pub const Body = struct { if (locked.readable.get()) |readable| { return readable.value; } - if (locked.promise != null) { + if (locked.promise != null or locked.action != .none) { return JSC.WebCore.ReadableStream.used(globalThis); } var drain_result: JSC.WebCore.DrainResult = .{ @@ -982,7 +982,81 @@ pub const Body = struct { this.Error.deinit(); } } + + pub fn tee(this: *Value, globalThis: *JSC.JSGlobalObject) Value { + var locked = &this.Locked; + + if (locked.readable.isDisturbed(globalThis)) { + return Value{ .Used = {} }; + } + + if (locked.readable.tee(globalThis)) |readable| { + return Value{ + .Locked = .{ + .readable = JSC.WebCore.ReadableStream.Strong.init(readable, globalThis), + .global = globalThis, + }, + }; + } + if (locked.promise != null or locked.action != .none or locked.readable.has()) { + return Value{ .Used = {} }; + } + + var drain_result: JSC.WebCore.DrainResult = .{ + .estimated_size = 0, + }; + + if (locked.onStartStreaming) |drain| { + locked.onStartStreaming = null; + drain_result = drain(locked.task.?); + } + + if (drain_result == .empty or drain_result == .aborted) { + this.* = .{ .Null = {} }; + return Value{ .Null = {} }; + } + + var reader = JSC.WebCore.ByteStream.Source.new(.{ + .context = undefined, + .globalThis = globalThis, + }); + + reader.context.setup(); + + if (drain_result == .estimated_size) { + reader.context.highWaterMark = @as(Blob.SizeType, @truncate(drain_result.estimated_size)); + reader.context.size_hint = @as(Blob.SizeType, @truncate(drain_result.estimated_size)); + } else if (drain_result == .owned) { + reader.context.buffer = drain_result.owned.list; + reader.context.size_hint = @as(Blob.SizeType, @truncate(drain_result.owned.size_hint)); + } + + locked.readable = JSC.WebCore.ReadableStream.Strong.init(.{ + .ptr = .{ .Bytes = &reader.context }, + .value = reader.toReadableStream(globalThis), + }, globalThis); + + if (locked.onReadableStreamAvailable) |onReadableStreamAvailable| { + onReadableStreamAvailable(locked.task.?, globalThis, locked.readable.get().?); + } + + const teed = locked.readable.tee(globalThis) orelse return Value{ .Used = {} }; + + return Value{ + .Locked = .{ + .readable = JSC.WebCore.ReadableStream.Strong.init(teed, globalThis), + .global = globalThis, + }, + }; + } + pub fn clone(this: *Value, globalThis: *JSC.JSGlobalObject) Value { + this.toBlobIfPossible(); + + if (this.* == .Locked) { + return this.tee(globalThis); + } + if (this.* == .InternalBlob) { var internal_blob = this.InternalBlob; this.* = .{ diff --git a/src/bun.js/webcore/request.zig b/src/bun.js/webcore/request.zig index 1a0b2e9703e42..7898efa533efd 100644 --- a/src/bun.js/webcore/request.zig +++ b/src/bun.js/webcore/request.zig @@ -744,8 +744,9 @@ pub const Request = struct { pub fn doClone( this: *Request, globalThis: *JSC.JSGlobalObject, - _: *JSC.CallFrame, + callframe: *JSC.CallFrame, ) JSC.JSValue { + const this_value = callframe.this(); var cloned = this.clone(getAllocator(globalThis), globalThis); if (globalThis.hasException()) { @@ -753,7 +754,24 @@ pub const Request = struct { return .zero; } - return cloned.toJS(globalThis); + const js_wrapper = cloned.toJS(globalThis); + if (js_wrapper != .zero) { + if (cloned.body.value == .Locked) { + if (cloned.body.value.Locked.readable.get()) |readable| { + // If we are teed, then we need to update the cached .body + // value to point to the new readable stream + // We must do this on both the original and cloned request + // but especially the original request since it will have a stale .body value now. + Request.bodySetCached(js_wrapper, globalThis, readable.value); + + if (this.body.value.Locked.readable.get()) |other_readable| { + Request.bodySetCached(this_value, globalThis, other_readable.value); + } + } + } + } + + return js_wrapper; } // Returns if the request has headers already cached/set. diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index f665dd86194dc..21866e5b5fe1f 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -261,10 +261,33 @@ pub const Response = struct { pub fn doClone( this: *Response, globalThis: *JSC.JSGlobalObject, - _: *JSC.CallFrame, + callframe: *JSC.CallFrame, ) JSValue { + const this_value = callframe.this(); const cloned = this.clone(globalThis); - return Response.makeMaybePooled(globalThis, cloned); + if (globalThis.hasException()) { + cloned.finalize(); + return .zero; + } + + const js_wrapper = Response.makeMaybePooled(globalThis, cloned); + + if (js_wrapper != .zero) { + if (cloned.body.value == .Locked) { + if (cloned.body.value.Locked.readable.get()) |readable| { + // If we are teed, then we need to update the cached .body + // value to point to the new readable stream + // We must do this on both the original and cloned response + // but especially the original response since it will have a stale .body value now. + Response.bodySetCached(js_wrapper, globalThis, readable.value); + if (this.body.value.Locked.readable.get()) |other_readable| { + Response.bodySetCached(this_value, globalThis, other_readable.value); + } + } + } + } + + return js_wrapper; } pub fn makeMaybePooled(globalObject: *JSC.JSGlobalObject, ptr: *Response) JSValue { diff --git a/src/bun.js/webcore/streams.zig b/src/bun.js/webcore/streams.zig index cfee795efff6a..3397ef0657eea 100644 --- a/src/bun.js/webcore/streams.zig +++ b/src/bun.js/webcore/streams.zig @@ -57,6 +57,10 @@ pub const ReadableStream = struct { return this.held.globalThis; } + pub fn has(this: *Strong) bool { + return this.held.has(); + } + pub fn isDisturbed(this: *const Strong, global: *JSC.JSGlobalObject) bool { if (this.get()) |stream| { return stream.isDisturbed(global); @@ -84,8 +88,29 @@ pub const ReadableStream = struct { // } this.held.deinit(); } + + pub fn tee(this: *Strong, global: *JSGlobalObject) ?ReadableStream { + if (this.get()) |stream| { + const first, const second = stream.tee(global) orelse return null; + this.held.set(global, first.value); + return second; + } + return null; + } }; + extern fn ReadableStream__tee(stream: JSValue, globalThis: *JSGlobalObject, out1: *JSC.JSValue, out2: *JSC.JSValue) bool; + pub fn tee(this: *const ReadableStream, globalThis: *JSGlobalObject) ?struct { ReadableStream, ReadableStream } { + var out1: JSC.JSValue = .zero; + var out2: JSC.JSValue = .zero; + if (!ReadableStream__tee(this.value, globalThis, &out1, &out2)) { + return null; + } + const out_stream2 = ReadableStream.fromJS(out2, globalThis) orelse return null; + const out_stream1 = ReadableStream.fromJS(out1, globalThis) orelse return null; + return .{ out_stream1, out_stream2 }; + } + pub fn toJS(this: *const ReadableStream) JSValue { return this.value; } diff --git a/src/codegen/bundle-functions.ts b/src/codegen/bundle-functions.ts index 461adda7be9e3..0ada057c1f7ba 100644 --- a/src/codegen/bundle-functions.ts +++ b/src/codegen/bundle-functions.ts @@ -44,6 +44,7 @@ interface ParsedBuiltin { directives: Record; source: string; async: boolean; + enums: string[]; } interface BundledBuiltin { @@ -74,13 +75,15 @@ async function processFileSplit(filename: string): Promise<{ functions: BundledB // and then compile those separately const consumeWhitespace = /^\s*/; - const consumeTopLevelContent = /^(\/\*|\/\/|type|import|interface|\$|export (?:async )?function|(?:async )?function)/; - const consumeEndOfType = /;|.(?=export|type|interface|\$|\/\/|\/\*|function)/; + const consumeTopLevelContent = + /^(\/\*|\/\/|type|import|interface|\$|const enum|export (?:async )?function|(?:async )?function)/; + const consumeEndOfType = /;|.(?=export|type|interface|\$|\/\/|\/\*|function|const enum)/; const functions: ParsedBuiltin[] = []; let directives: Record = {}; const bundledFunctions: BundledBuiltin[] = []; let internal = false; + const topLevelEnums: { name: string; code: string }[] = []; while (contents.length) { contents = contents.replace(consumeWhitespace, ""); @@ -107,6 +110,16 @@ async function processFileSplit(filename: string): Promise<{ functions: BundledB contents = contents.slice(i + 1); } else if (match[1] === "interface") { contents = sliceSourceCode(contents, false).rest; + } else if (match[1] === "const enum") { + const { result, rest } = sliceSourceCode(contents, false); + const i = result.indexOf("{\n"); + // Support const enums in module scope. + topLevelEnums.push({ + name: result.slice("const enum ".length, i).trim(), + code: "\n" + result, + }); + + contents = rest; } else if (match[1] === "$") { const directive = contents.match(/^\$([a-zA-Z0-9]+)(?:\s*=\s*([^\r\n]+?))?\s*;?\r?\n/); if (!directive) { @@ -148,12 +161,27 @@ async function processFileSplit(filename: string): Promise<{ functions: BundledB globalThis.requireTransformer(x, SRC_DIR + "/" + basename), ); + const source = result.trim().slice(2, -1); + const constEnumsUsedInFunction: string[] = []; + if (topLevelEnums.length) { + // If the function references a top-level const enum let's add the code + // to the top-level scope of the function so that the transpiler will + // inline all the values and strip out the enum object. + for (const { name, code } of topLevelEnums) { + // Only include const enums which are referenced in the function source. + if (source.includes(name)) { + constEnumsUsedInFunction.push(code); + } + } + } + functions.push({ name, params, directives, - source: result.trim().slice(2, -1), + source, async, + enums: constEnumsUsedInFunction, }); contents = rest; directives = {}; @@ -178,7 +206,7 @@ async function processFileSplit(filename: string): Promise<{ functions: BundledB `// @ts-nocheck // GENERATED TEMP FILE - DO NOT EDIT // Sourced from ${path.relative(TMP_DIR, filename)} - +${fn.enums.join("\n")} // do not allow the bundler to rename a symbol to $ ($); diff --git a/src/js/builtins/ReadableStream.ts b/src/js/builtins/ReadableStream.ts index d34451a0fd9e1..ed45aaab37227 100644 --- a/src/js/builtins/ReadableStream.ts +++ b/src/js/builtins/ReadableStream.ts @@ -151,16 +151,67 @@ export function readableStreamToArrayBuffer(stream: ReadableStream) } result = Bun.readableStreamToArray(stream); - if ($isPromise(result)) { - // `result` is an InternalPromise, which doesn't have a `.then` method - // but `.then` isn't user-overridable, so we can use it safely. - return result.then(x => (x.length === 1 && x[0] instanceof ArrayBuffer ? x[0] : Bun.concatArrayBuffers(x))); + + function toArrayBuffer(result: unknown[]) { + switch (result.length) { + case 0: { + return new ArrayBuffer(0); + } + case 1: { + const view = result[0]; + if (view instanceof ArrayBuffer || view instanceof SharedArrayBuffer) { + return view; + } + + if (ArrayBuffer.isView(view)) { + const buffer = view.buffer; + const byteOffset = view.byteOffset; + const byteLength = view.byteLength; + if (byteOffset === 0 && byteLength === buffer.byteLength) { + return buffer; + } + + return buffer.slice(byteOffset, byteOffset + byteLength); + } + + if (typeof view === "string") { + return new TextEncoder().encode(view); + } + } + default: { + let anyStrings = false; + for (const chunk of result) { + if (typeof chunk === "string") { + anyStrings = true; + break; + } + } + + if (!anyStrings) { + return Bun.concatArrayBuffers(result, false); + } + + const sink = new Bun.ArrayBufferSink(); + sink.start(); + + for (const chunk of result) { + sink.write(chunk); + } + + return sink.end() as Uint8Array; + } + } } - if (result.length === 1) { - return result[0]; + if ($isPromise(result)) { + const completedResult = Bun.peek(result); + if (completedResult !== result) { + result = completedResult; + } else { + return result.then(toArrayBuffer); + } } - return Bun.concatArrayBuffers(result); + return $createFulfilledPromise(toArrayBuffer(result)); } $linkTimeConstant; @@ -180,22 +231,65 @@ export function readableStreamToBytes(stream: ReadableStream): Prom } result = Bun.readableStreamToArray(stream); - if ($isPromise(result)) { - // `result` is an InternalPromise, which doesn't have a `.then` method - // but `.then` isn't user-overridable, so we can use it safely. - return result.then(x => { - // Micro-optimization: if the result is a single Uint8Array chunk, let's just return it without cloning. - if (x.length === 1 && x[0] instanceof ArrayBuffer) { - return new Uint8Array(x[0]); + + function toBytes(result: unknown[]) { + switch (result.length) { + case 0: { + return new Uint8Array(0); } - return Bun.concatArrayBuffers(x, Infinity, true); - }); + case 1: { + const view = result[0]; + if (view instanceof Uint8Array) { + return view; + } + + if (ArrayBuffer.isView(view)) { + return new Uint8Array(view.buffer, view.byteOffset, view.byteLength); + } + + if (view instanceof ArrayBuffer || view instanceof SharedArrayBuffer) { + return new Uint8Array(view); + } + + if (typeof view === "string") { + return new TextEncoder().encode(view); + } + } + default: { + let anyStrings = false; + for (const chunk of result) { + if (typeof chunk === "string") { + anyStrings = true; + break; + } + } + + if (!anyStrings) { + return Bun.concatArrayBuffers(result, true); + } + + const sink = new Bun.ArrayBufferSink(); + sink.start({ asUint8Array: true }); + + for (const chunk of result) { + sink.write(chunk); + } + + return sink.end() as Uint8Array; + } + } } - if (result.length === 1 && result[0] instanceof ArrayBuffer) { - return new Uint8Array(result[0]); + if ($isPromise(result)) { + const completedResult = Bun.peek(result); + if (completedResult !== result) { + result = completedResult; + } else { + return result.then(toBytes); + } } - return Bun.concatArrayBuffers(result, Infinity, true); + + return $createFulfilledPromise(toBytes(result)); } $linkTimeConstant; @@ -212,11 +306,22 @@ export function readableStreamToFormData( $linkTimeConstant; export function readableStreamToJSON(stream: ReadableStream): unknown { if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked")); + let result = $tryUseReadableStreamBufferedFastPath(stream, "json"); + if (result) { + return result; + } - return ( - $tryUseReadableStreamBufferedFastPath(stream, "json") || - Promise.resolve(Bun.readableStreamToText(stream)).then(globalThis.JSON.parse) - ); + let text = Bun.readableStreamToText(stream); + const peeked = Bun.peek(text); + if (peeked !== text) { + try { + return $createFulfilledPromise(globalThis.JSON.parse(peeked)); + } catch (e) { + return Promise.reject(e); + } + } + + return text.then(globalThis.JSON.parse); } $linkTimeConstant; diff --git a/src/js/builtins/ReadableStreamInternals.ts b/src/js/builtins/ReadableStreamInternals.ts index e26d46689dc0c..7f95f39ee9b9d 100644 --- a/src/js/builtins/ReadableStreamInternals.ts +++ b/src/js/builtins/ReadableStreamInternals.ts @@ -490,6 +490,14 @@ export function pipeToFinalize(pipeState) { else pipeState.promiseCapability.resolve.$call(); } +const enum TeeStateFlags { + canceled1 = 1 << 0, + canceled2 = 1 << 1, + reading = 1 << 2, + closedOrErrored = 1 << 3, + readAgain = 1 << 4, +} + export function readableStreamTee(stream, shouldClone) { $assert($isReadableStream(stream)); $assert(typeof shouldClone === "boolean"); @@ -503,34 +511,41 @@ export function readableStreamTee(stream, shouldClone) { const reader = new $ReadableStreamDefaultReader(stream); const teeState = { - closedOrErrored: false, - canceled1: false, - canceled2: false, + stream, + flags: 0, reason1: undefined, reason2: undefined, + branch1Source: undefined, + branch2Source: undefined, + branch1: undefined, + branch2: undefined, + cancelPromiseCapability: $newPromiseCapability(Promise), }; - teeState.cancelPromiseCapability = $newPromiseCapability(Promise); - const pullFunction = $readableStreamTeePullFunction(teeState, reader, shouldClone); - const branch1Source = {}; - $putByIdDirectPrivate(branch1Source, "pull", pullFunction); - $putByIdDirectPrivate(branch1Source, "cancel", $readableStreamTeeBranch1CancelFunction(teeState, stream)); + const branch1Source = { + $pull: pullFunction, + $cancel: $readableStreamTeeBranch1CancelFunction(teeState, stream), + }; - const branch2Source = {}; - $putByIdDirectPrivate(branch2Source, "pull", pullFunction); - $putByIdDirectPrivate(branch2Source, "cancel", $readableStreamTeeBranch2CancelFunction(teeState, stream)); + const branch2Source = { + $pull: pullFunction, + $cancel: $readableStreamTeeBranch2CancelFunction(teeState, stream), + }; const branch1 = new $ReadableStream(branch1Source); const branch2 = new $ReadableStream(branch2Source); $getByIdDirectPrivate(reader, "closedPromiseCapability").promise.$then(undefined, function (e) { - if (teeState.closedOrErrored) return; + const flags = teeState.flags; + if (flags & TeeStateFlags.closedOrErrored) return; $readableStreamDefaultControllerError(branch1.$readableStreamController, e); $readableStreamDefaultControllerError(branch2.$readableStreamController, e); - teeState.closedOrErrored = true; - if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.resolve.$call(); + teeState.flags |= TeeStateFlags.closedOrErrored; + + if (teeState.fllags & (TeeStateFlags.canceled1 | TeeStateFlags.canceled2)) + teeState.cancelPromiseCapability.resolve.$call(); }); // Additional fields compared to the spec, as they are needed within pull/cancel functions. @@ -541,36 +556,76 @@ export function readableStreamTee(stream, shouldClone) { } export function readableStreamTeePullFunction(teeState, reader, shouldClone) { - return function () { - Promise.prototype.$then.$call($readableStreamDefaultReaderRead(reader), function (result) { - $assert($isObject(result)); - $assert(typeof result.done === "boolean"); - if (result.done && !teeState.closedOrErrored) { - if (!teeState.canceled1) $readableStreamDefaultControllerClose(teeState.branch1.$readableStreamController); - if (!teeState.canceled2) $readableStreamDefaultControllerClose(teeState.branch2.$readableStreamController); - teeState.closedOrErrored = true; - if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.resolve.$call(); - } - if (teeState.closedOrErrored) return; - if (!teeState.canceled1) - $readableStreamDefaultControllerEnqueue(teeState.branch1.$readableStreamController, result.value); - if (!teeState.canceled2) - $readableStreamDefaultControllerEnqueue( - teeState.branch2.$readableStreamController, - shouldClone ? $structuredCloneForStream(result.value) : result.value, - ); - }); + "use strict"; + + const pullAlgorithm = function () { + if (teeState.flags & TeeStateFlags.reading) { + teeState.flags |= TeeStateFlags.readAgain; + return $Promise.$resolve(); + } + teeState.flags |= TeeStateFlags.reading; + $Promise.prototype.$then.$call( + $readableStreamDefaultReaderRead(reader), + function (result) { + $assert($isObject(result)); + $assert(typeof result.done === "boolean"); + const { done, value } = result; + if (done) { + // close steps. + teeState.flags &= ~TeeStateFlags.reading; + if (!(teeState.flags & TeeStateFlags.canceled1)) + $readableStreamDefaultControllerClose(teeState.branch1.$readableStreamController); + if (!(teeState.flags & TeeStateFlags.canceled2)) + $readableStreamDefaultControllerClose(teeState.branch2.$readableStreamController); + if (!(teeState.flags & TeeStateFlags.canceled1) || !(teeState.flags & TeeStateFlags.canceled2)) + teeState.cancelPromiseCapability.resolve.$call(); + return; + } + // chunk steps. + teeState.flags &= ~TeeStateFlags.readAgain; + let chunk1 = value; + let chunk2 = value; + if (!(teeState.flags & TeeStateFlags.canceled2) && shouldClone) { + try { + chunk2 = $structuredCloneForStream(value); + } catch (e) { + $readableStreamDefaultControllerError(teeState.branch1.$readableStreamController, e); + $readableStreamDefaultControllerError(teeState.branch2.$readableStreamController, e); + $readableStreamCancel(teeState.stream, e).$then( + teeState.cancelPromiseCapability.resolve, + teeState.cancelPromiseCapability.reject, + ); + return; + } + } + if (!(teeState.flags & TeeStateFlags.canceled1)) + $readableStreamDefaultControllerEnqueue(teeState.branch1.$readableStreamController, chunk1); + if (!(teeState.flags & TeeStateFlags.canceled2)) + $readableStreamDefaultControllerEnqueue(teeState.branch2.$readableStreamController, chunk2); + teeState.flags &= ~TeeStateFlags.reading; + + $Promise.$resolve().$then(() => { + if (teeState.flags & TeeStateFlags.readAgain) pullAlgorithm(); + }); + }, + () => { + // error steps. + teeState.flags &= ~TeeStateFlags.reading; + }, + ); + return $Promise.$resolve(); }; + return pullAlgorithm; } export function readableStreamTeeBranch1CancelFunction(teeState, stream) { return function (r) { - teeState.canceled1 = true; + teeState.flags |= TeeStateFlags.canceled1; teeState.reason1 = r; - if (teeState.canceled2) { + if (teeState.flags & TeeStateFlags.canceled2) { $readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then( - teeState.cancelPromiseCapability.$resolve, - teeState.cancelPromiseCapability.$reject, + teeState.cancelPromiseCapability.resolve, + teeState.cancelPromiseCapability.reject, ); } return teeState.cancelPromiseCapability.promise; @@ -579,12 +634,12 @@ export function readableStreamTeeBranch1CancelFunction(teeState, stream) { export function readableStreamTeeBranch2CancelFunction(teeState, stream) { return function (r) { - teeState.canceled2 = true; + teeState.flags |= TeeStateFlags.canceled2; teeState.reason2 = r; - if (teeState.canceled1) { + if (teeState.flags & TeeStateFlags.canceled1) { $readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then( - teeState.cancelPromiseCapability.$resolve, - teeState.cancelPromiseCapability.$reject, + teeState.cancelPromiseCapability.resolve, + teeState.cancelPromiseCapability.reject, ); } return teeState.cancelPromiseCapability.promise; diff --git a/test/js/web/fetch/body-clone.test.ts b/test/js/web/fetch/body-clone.test.ts new file mode 100644 index 0000000000000..7f8f5c9446578 --- /dev/null +++ b/test/js/web/fetch/body-clone.test.ts @@ -0,0 +1,520 @@ +import { test, expect } from "bun:test"; + +test("Request with streaming body can be cloned", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("Hello"); + controller.enqueue(" "); + controller.enqueue("World"); + controller.close(); + }, + }); + + const request = new Request("https://example.com", { method: "POST", body: stream }); + const clonedRequest = request.clone(); + + const originalBody = await request.text(); + const clonedBody = await clonedRequest.text(); + + expect(originalBody).toBe("Hello World"); + expect(clonedBody).toBe("Hello World"); +}); + +test("Response with streaming body can be cloned", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("Test"); + controller.enqueue(" "); + controller.enqueue("Data"); + controller.close(); + }, + }); + + const response = new Response(stream); + const clonedResponse = response.clone(); + + const originalBody = await response.text(); + const clonedBody = await clonedResponse.text(); + + expect(originalBody).toBe("Test Data"); + expect(clonedBody).toBe("Test Data"); +}); + +test("Request with large streaming body can be cloned", async () => { + let largeData = "x".repeat(1024 * 1024); // 1MB of data + let chunks = []; + for (let chunkSize = 1024; chunkSize <= 1024 * 1024; chunkSize *= 2) { + chunks.push(largeData.slice(0, chunkSize)); + } + largeData = chunks.join(""); + const stream = new ReadableStream({ + start(controller) { + for (let chunk of chunks) { + controller.enqueue(chunk); + } + controller.close(); + }, + }); + + const request = new Request("https://example.com", { method: "POST", body: stream }); + const clonedRequest = request.clone(); + + const originalBody = await request.text(); + const clonedBody = await clonedRequest.text(); + + expect(originalBody).toBe(largeData); + expect(clonedBody).toBe(largeData); +}); + +test("Request with large streaming body can be cloned (pull)", async () => { + let largeData = "x".repeat(1024 * 1024); // 1MB of data + let chunks = []; + for (let chunkSize = 1024; chunkSize <= 1024 * 1024; chunkSize *= 2) { + chunks.push(largeData.slice(0, chunkSize)); + } + largeData = chunks.join(""); + const stream = new ReadableStream({ + async pull(controller) { + await 42; + for (let chunk of chunks) { + controller.enqueue(chunk); + } + controller.close(); + }, + }); + + const request = new Request("https://example.com", { method: "POST", body: stream }); + const clonedRequest = request.clone(); + + const originalBody = await request.text(); + const clonedBody = await clonedRequest.text(); + + expect(originalBody).toBe(largeData); + expect(clonedBody).toBe(largeData); +}); + +test("Response with chunked streaming body can be cloned", async () => { + const chunks = ["Chunk1", "Chunk2", "Chunk3"]; + const stream = new ReadableStream({ + async start(controller) { + for (const chunk of chunks) { + controller.enqueue(chunk); + await new Promise(resolve => setTimeout(resolve, 10)); + } + controller.close(); + }, + }); + + const response = new Response(stream); + const clonedResponse = response.clone(); + + const originalBody = await response.text(); + const clonedBody = await clonedResponse.text(); + + expect(originalBody).toBe(chunks.join("")); + expect(clonedBody).toBe(chunks.join("")); +}); + +test("Request with streaming body can be cloned multiple times", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("Multi"); + controller.enqueue("Clone"); + controller.enqueue("Test"); + controller.close(); + }, + }); + + const request = new Request("https://example.com", { method: "POST", body: stream }); + const clonedRequest1 = request.clone(); + const clonedRequest2 = request.clone(); + + const originalBody = await request.text(); + const clonedBody1 = await clonedRequest1.text(); + const clonedBody2 = await clonedRequest2.text(); + + expect(originalBody).toBe("MultiCloneTest"); + expect(clonedBody1).toBe("MultiCloneTest"); + expect(clonedBody2).toBe("MultiCloneTest"); +}); + +test("Request with string body can be cloned", async () => { + const body = "Hello, world!"; + const request = new Request("https://example.com", { method: "POST", body }); + const clonedRequest = request.clone(); + + const originalBody = await request.text(); + const clonedBody = await clonedRequest.text(); + + expect(originalBody).toBe(body); + expect(clonedBody).toBe(body); +}); + +test("Response with string body can be cloned", async () => { + const body = "Hello, world!"; + const response = new Response(body); + const clonedResponse = response.clone(); + + const originalBody = await response.text(); + const clonedBody = await clonedResponse.text(); + + expect(originalBody).toBe(body); + expect(clonedBody).toBe(body); +}); + +test("Request with ArrayBuffer body can be cloned", async () => { + const body = new ArrayBuffer(8); + new Uint8Array(body).set([1, 2, 3, 4, 5, 6, 7, 8]); + const request = new Request("https://example.com", { method: "POST", body }); + const clonedRequest = request.clone(); + + const originalBody = new Uint8Array(await request.arrayBuffer()); + const clonedBody = new Uint8Array(await clonedRequest.arrayBuffer()); + + expect(originalBody).toEqual(new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8])); + expect(clonedBody).toEqual(new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8])); +}); + +test("Response with ArrayBuffer body can be cloned", async () => { + const body = new ArrayBuffer(8); + new Uint8Array(body).set([1, 2, 3, 4, 5, 6, 7, 8]); + const response = new Response(body); + const clonedResponse = response.clone(); + + const originalBody = new Uint8Array(await response.arrayBuffer()); + const clonedBody = new Uint8Array(await clonedResponse.arrayBuffer()); + + expect(originalBody).toEqual(new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8])); + expect(clonedBody).toEqual(new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8])); +}); + +test("Request with Uint8Array body can be cloned", async () => { + const body = new Uint8Array([1, 2, 3, 4, 5]); + const request = new Request("https://example.com", { method: "POST", body }); + const clonedRequest = request.clone(); + + const originalBody = new Uint8Array(await request.arrayBuffer()); + const clonedBody = new Uint8Array(await clonedRequest.arrayBuffer()); + + expect(originalBody).toEqual(new Uint8Array([1, 2, 3, 4, 5])); + expect(clonedBody).toEqual(new Uint8Array([1, 2, 3, 4, 5])); +}); + +test("Response with Uint8Array body can be cloned", async () => { + const body = new Uint8Array([1, 2, 3, 4, 5]); + const response = new Response(body); + const clonedResponse = response.clone(); + + const originalBody = new Uint8Array(await response.arrayBuffer()); + const clonedBody = new Uint8Array(await clonedResponse.arrayBuffer()); + + expect(originalBody).toEqual(new Uint8Array([1, 2, 3, 4, 5])); + expect(clonedBody).toEqual(new Uint8Array([1, 2, 3, 4, 5])); +}); + +test("Request with mixed body types can be cloned", async () => { + const bodies = [ + "Hello, world!", + new ArrayBuffer(8), + new Uint8Array([1, 2, 3, 4, 5]), + new ReadableStream({ + start(controller) { + controller.enqueue("Stream"); + controller.close(); + }, + }), + ]; + + for (const body of bodies) { + const request = new Request("https://example.com", { method: "POST", body }); + const clonedRequest = request.clone(); + + let originalBody, clonedBody; + + if (typeof body === "string") { + originalBody = await request.text(); + clonedBody = await clonedRequest.text(); + } else { + originalBody = new Uint8Array(await request.arrayBuffer()); + clonedBody = new Uint8Array(await clonedRequest.arrayBuffer()); + } + + expect(originalBody).toEqual(clonedBody); + } +}); + +test("Response with mixed body types can be cloned", async () => { + const bodies = [ + "Hello, world!", + new ArrayBuffer(8), + new Uint8Array([1, 2, 3, 4, 5]), + new ReadableStream({ + start(controller) { + controller.enqueue("Stream"); + controller.close(); + }, + }), + ]; + + for (const body of bodies) { + const response = new Response(body); + const clonedResponse = response.clone(); + + let originalBody, clonedBody; + + if (typeof body === "string") { + originalBody = await response.text(); + clonedBody = await clonedResponse.text(); + } else { + originalBody = new Uint8Array(await response.arrayBuffer()); + clonedBody = new Uint8Array(await clonedResponse.arrayBuffer()); + } + + expect(originalBody).toEqual(clonedBody); + } +}); + +test("Request with non-ASCII string body can be cloned", async () => { + const body = "Hello, 世界! 🌍 Здравствуй, мир!"; + const request = new Request("https://example.com", { method: "POST", body }); + const clonedRequest = request.clone(); + + const originalBody = await request.text(); + const clonedBody = await clonedRequest.text(); + + expect(originalBody).toBe(body); + expect(clonedBody).toBe(body); +}); + +test("Response with non-ASCII string body can be cloned", async () => { + const body = "こんにちは、世界! 🌎 Bonjour, le monde!"; + const response = new Response(body); + const clonedResponse = response.clone(); + + const originalBody = await response.text(); + const clonedBody = await clonedResponse.text(); + + expect(originalBody).toBe(body); + expect(clonedBody).toBe(body); +}); + +test("Request with streaming non-ASCII body can be cloned", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("Hello, "); + controller.enqueue("世界"); + controller.enqueue("! 🌏 "); + controller.enqueue("Olá, mundo!"); + controller.close(); + }, + }); + + const request = new Request("https://example.com", { method: "POST", body: stream }); + const clonedRequest = request.clone(); + + const originalBody = await request.text(); + const clonedBody = await clonedRequest.text(); + + expect(originalBody).toBe("Hello, 世界! 🌏 Olá, mundo!"); + expect(clonedBody).toBe("Hello, 世界! 🌏 Olá, mundo!"); +}); + +test("Response with streaming non-ASCII body can be cloned", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("Здравствуй, "); + controller.enqueue("мир"); + controller.enqueue("! 🌍 "); + controller.enqueue("Hola, mundo!"); + controller.close(); + }, + }); + + const response = new Response(stream); + const clonedResponse = response.clone(); + + const originalBody = await response.text(); + const clonedBody = await clonedResponse.text(); + + expect(originalBody).toBe("Здравствуй, мир! 🌍 Hola, mundo!"); + expect(clonedBody).toBe("Здравствуй, мир! 🌍 Hola, mundo!"); +}); + +test("Request with mixed non-ASCII body types can be cloned", async () => { + const bodies = [ + "Hello, 世界! 🌍", + new TextEncoder().encode("こんにちは、世界! 🌎"), + new ReadableStream({ + start(controller) { + controller.enqueue("Здравствуй, "); + controller.enqueue("мир"); + controller.enqueue("! 🌏"); + controller.close(); + }, + }), + ]; + + for (const body of bodies) { + const request = new Request("https://example.com", { method: "POST", body }); + const clonedRequest = request.clone(); + + let originalBody, clonedBody; + + if (typeof body === "string") { + originalBody = await request.text(); + clonedBody = await clonedRequest.text(); + } else if (body instanceof Uint8Array) { + originalBody = new TextDecoder().decode(await request.arrayBuffer()); + clonedBody = new TextDecoder().decode(await clonedRequest.arrayBuffer()); + } else { + originalBody = await request.text(); + clonedBody = await clonedRequest.text(); + } + + expect(originalBody).toEqual(clonedBody); + } +}); + +test("ReadableStream with mixed content (starting with string) can be converted to text", async () => { + const mixedContent = [ + "Hello, 世界! 🌍", + new Uint8Array([240, 159, 140, 141]), // 🌍 emoji + new ArrayBuffer(4), + "Здравствуй, мир!", + ]; + + let index = 0; + const stream = new ReadableStream({ + async pull(controller) { + await 1; // Delay in a microtask + if (index < mixedContent.length) { + controller.enqueue(mixedContent[index++]); + } else { + controller.close(); + } + }, + }); + + const text = await Bun.readableStreamToText(stream); + expect(typeof text).toBe("string"); + expect(text).toContain("Hello, 世界!"); + expect(text).toContain("🌍"); + expect(text).toContain("Здравствуй, мир!"); +}); + +test("ReadableStream with mixed content (starting with Uint8Array) can be converted to ArrayBuffer", async () => { + const mixedContent = [ + new Uint8Array([72, 101, 108, 108, 111]), // "Hello" in ASCII + "世界! 🌍", + new ArrayBuffer(4), + "Здравствуй, мир!", + ]; + + let index = 0; + const stream = new ReadableStream({ + async pull(controller) { + await 1; // Delay in a microtask + if (index < mixedContent.length) { + controller.enqueue(mixedContent[index++]); + } else { + controller.close(); + } + }, + }); + + const arrayBuffer = await Bun.readableStreamToArrayBuffer(stream); + expect(arrayBuffer).toBeInstanceOf(ArrayBuffer); + const text = new TextDecoder().decode(arrayBuffer); + expect(text).toContain("Hello"); + expect(text).toContain("世界!"); + expect(text).toContain("🌍"); + expect(text).toContain("Здравствуй, мир!"); +}); + +test("ReadableStream with mixed content (starting with ArrayBuffer) can be converted to Uint8Array", async () => { + const mixedContent = [ + new ArrayBuffer(4), + "Hello, 世界! 🌍", + new Uint8Array([240, 159, 140, 141]), // 🌍 emoji + "Здравствуй, мир!", + ]; + + let index = 0; + const stream = new ReadableStream({ + async pull(controller) { + await 1; // Delay in a microtask + if (index < mixedContent.length) { + controller.enqueue(mixedContent[index++]); + } else { + controller.close(); + } + }, + }); + + const uint8Array = await Bun.readableStreamToBytes(stream); + expect(uint8Array).toBeInstanceOf(Uint8Array); + const text = new TextDecoder().decode(uint8Array); + expect(text).toContain("Hello, 世界!"); + expect(text).toContain("🌍"); + expect(text).toContain("Здравствуй, мир!"); +}); + +test("ReadableStream with mixed content (starting with string) can be converted to ArrayBuffer using Response", async () => { + const mixedContent = [ + "Hello, ", + "世界! ", + new Uint8Array([240, 159, 140, 141]), // 🌍 emoji + "Здравствуй, мир!", + ]; + + let index = 0; + const stream = new ReadableStream({ + async pull(controller) { + await 1; // Delay in a microtask + if (index < mixedContent.length) { + controller.enqueue(mixedContent[index++]); + } else { + controller.close(); + } + }, + }); + + const response = new Response(stream); + const arrayBuffer = await response.arrayBuffer(); + expect(arrayBuffer).toBeInstanceOf(ArrayBuffer); + const text = new TextDecoder().decode(arrayBuffer); + expect(text).toContain("Hello"); + expect(text).toContain("世界!"); + expect(text).toContain("🌍"); + expect(text).toContain("Здравствуй, мир!"); +}); + +test("ReadableStream with mixed content (starting with ArrayBuffer) can be converted to Uint8Array using Response", async () => { + const mixedContent = [ + new ArrayBuffer(4), + "Hello, 世界! 🌍", + new Uint8Array([240, 159, 140, 141]), // 🌍 emoji + "Здравствуй, мир!", + ]; + + let index = 0; + const stream = new ReadableStream({ + async pull(controller) { + await 1; // Delay in a microtask + if (index < mixedContent.length) { + controller.enqueue(mixedContent[index++]); + } else { + controller.close(); + } + }, + }); + + const response = new Response(stream); + const uint8Array = await response.bytes(); + expect(uint8Array).toBeInstanceOf(Uint8Array); + const text = new TextDecoder().decode(uint8Array); + expect(text).toStartWith("\0\0\0\0"); + expect(text).toContain("Hello, 世界!"); + expect(text).toContain("🌍"); + expect(text).toContain("Здравствуй, мир!"); +}); diff --git a/test/js/web/fetch/body-stream.test.ts b/test/js/web/fetch/body-stream.test.ts index 0fd6efe9d93bd..f42de34e02fc0 100644 --- a/test/js/web/fetch/body-stream.test.ts +++ b/test/js/web/fetch/body-stream.test.ts @@ -2,9 +2,9 @@ import { gc, ServeOptions } from "bun"; import { afterAll, describe, expect, it, test } from "bun:test"; -var port = 0; +const port = 0; -{ +for (let doClone of [true, false]) { const BodyMixin = [ Request.prototype.arrayBuffer, Request.prototype.bytes, @@ -51,7 +51,9 @@ var port = 0; describe(`Request.prototoype.${RequestPrototypeMixin.name}() ${ useRequestObject ? "fetch(req)" - : "fetch(url)" + (forceReadableStreamConversionFastPath ? " (force fast ReadableStream conversion)" : "") + : "fetch(url)" + + (forceReadableStreamConversionFastPath ? " (force fast ReadableStream conversion)" : "") + + (doClone ? " (clone)" : "") }`, () => { const inputFixture = [ [JSON.stringify("Hello World"), JSON.stringify("Hello World")], @@ -83,19 +85,23 @@ var port = 0; if (forceReadableStreamConversionFastPath) { req.body; } - var result = await RequestPrototypeMixin.call(req); - if (RequestPrototypeMixin === Request.prototype.json) { - result = JSON.stringify(result); - } - if (typeof result === "string") { - expect(result.length).toBe(name.length); - expect(result).toBe(name); - } else if (result && result instanceof Blob) { - expect(result.size).toBe(new TextEncoder().encode(name).byteLength); - expect(await result.text()).toBe(name); - } else { - expect(result.byteLength).toBe(Buffer.from(input).byteLength); - expect(Bun.SHA1.hash(result, "base64")).toBe(Bun.SHA1.hash(input, "base64")); + + var result; + for (const request of doClone ? [req.clone(), req] : [req]) { + result = await RequestPrototypeMixin.call(request); + if (RequestPrototypeMixin === Request.prototype.json) { + result = JSON.stringify(result); + } + if (typeof result === "string") { + expect(result.length).toBe(name.length); + expect(result).toBe(name); + } else if (result && result instanceof Blob) { + expect(result.size).toBe(new TextEncoder().encode(name).byteLength); + expect(await result.text()).toBe(name); + } else { + expect(result.byteLength).toBe(Buffer.from(input).byteLength); + expect(Bun.SHA1.hash(result, "base64")).toBe(Bun.SHA1.hash(input, "base64")); + } } return new Response(result, { headers: req.headers, @@ -132,15 +138,25 @@ var port = 0; response.body; } + const originalResponse = response; + + if (doClone) { + response = response.clone(); + } + expect(response.status).toBe(200); expect(response.headers.get("content-length")).toBe(String(Buffer.from(input).byteLength)); expect(response.headers.get("content-type")).toBe("text/plain"); - expect(await response.text()).toBe(name); + const initialResponseText = await response.text(); + expect(initialResponseText).toBe(name); - var promises = new Array(5); - for (let i = 0; i < 5; i++) { + if (doClone) { + expect(await originalResponse.text()).toBe(name); + } + + let promises = Array.from({ length: 5 }).map((_, i) => { if (useRequestObject) { - promises[i] = await fetch( + return fetch( new Request({ body: input, method: "POST", @@ -152,7 +168,7 @@ var port = 0; }), ); } else { - promises[i] = await fetch(url, { + return fetch(url, { body: input, method: "POST", headers: { @@ -161,20 +177,28 @@ var port = 0; }, }); } - } + }); - const results = await Promise.all(promises); - for (let i = 0; i < 5; i++) { - const response = results[i]; - if (forceReadableStreamConversionFastPath) { - response.body; - } - expect(response.status).toBe(200); - expect(response.headers.get("content-length")).toBe(String(Buffer.from(input).byteLength)); - expect(response.headers.get("content-type")).toBe("text/plain"); - expect(response.headers.get("x-counter")).toBe(String(i)); - expect(await response.text()).toBe(name); - } + promises = await Promise.all(promises); + + await Promise.all( + promises.map(async (originalResponse, i) => { + if (forceReadableStreamConversionFastPath) { + originalResponse.body; + } + + for (let response of doClone + ? [originalResponse.clone(), originalResponse] + : [originalResponse]) { + expect(response.status).toBe(200); + expect(response.headers.get("content-length")).toBe(String(Buffer.from(input).byteLength)); + expect(response.headers.get("content-type")).toBe("text/plain"); + expect(response.headers.get("x-counter")).toBe(String(i)); + const responseText = await response.text(); + expect(responseText).toBe(name); + } + }), + ); }, ); }); @@ -243,293 +267,293 @@ function gc() { Bun.gc(true); } -describe("reader", function () { - for (let forceReadableStreamConversionFastPath of [true, false]) { - for (let withDelay of [false, true]) { - try { - // - 1 byte - // - less than the InlineBlob limit - // - multiple chunks - // - backpressure - - for (let inputLength of [1, 2, 12, 95, 1024, 1024 * 1024, 1024 * 1024 * 2]) { - var bytes = new Uint8Array(inputLength); - { - const chunk = Math.min(bytes.length, 256); - for (var i = 0; i < chunk; i++) { - bytes[i] = 255 - i; +for (let doClone of [true, false]) { + describe("reader" + (doClone ? " (clone)" : ""), function () { + for (let forceReadableStreamConversionFastPath of [true, false]) { + for (let withDelay of [false, true]) { + try { + // - 1 byte + // - less than the InlineBlob limit + // - multiple chunks + // - backpressure + + for (let inputLength of [1, 2, 12, 95, 1024, 1024 * 1024, 1024 * 1024 * 2]) { + var bytes = new Uint8Array(inputLength); + { + const chunk = Math.min(bytes.length, 256); + for (var i = 0; i < chunk; i++) { + bytes[i] = 255 - i; + } } - } - if (bytes.length > 255) fillRepeating(bytes, 0, bytes.length); - - for (const huge_ of [ - bytes, - bytes.buffer, - new DataView(bytes.buffer), - new Int8Array(bytes), - new Blob([bytes]), - - new Uint16Array(bytes), - new Uint32Array(bytes), - new Float64Array(bytes), - - new Int16Array(bytes), - new Int32Array(bytes), - new Float16Array(bytes), - new Float32Array(bytes), - - // make sure we handle subarray() as expected when reading - // typed arrays from native code - new Int16Array(bytes).subarray(1), - new Int16Array(bytes).subarray(0, new Int16Array(bytes).byteLength - 1), - new Int32Array(bytes).subarray(1), - new Int32Array(bytes).subarray(0, new Int32Array(bytes).byteLength - 1), - new Float16Array(bytes).subarray(1), - new Float16Array(bytes).subarray(0, new Float16Array(bytes).byteLength - 1), - new Float32Array(bytes).subarray(1), - new Float32Array(bytes).subarray(0, new Float32Array(bytes).byteLength - 1), - new Int16Array(bytes).subarray(0, 1), - new Int32Array(bytes).subarray(0, 1), - new Float16Array(bytes).subarray(0, 1), - new Float32Array(bytes).subarray(0, 1), - ]) { - gc(); - const thisArray = huge_; - if (Number(thisArray.byteLength ?? thisArray.size) === 0) continue; - - it( - `works with ${thisArray.constructor.name}(${ - thisArray.byteLength ?? thisArray.size - }:${inputLength}) via req.body.getReader() in chunks` + - (withDelay ? " with delay" : "") + - (forceReadableStreamConversionFastPath ? " (force ReadableStream conversion)" : ""), - async () => { - var huge = thisArray; - var called = false; - gc(); - - const expectedHash = - huge instanceof Blob ? Bun.SHA1.hash(await huge.bytes(), "base64") : Bun.SHA1.hash(huge, "base64"); - const expectedSize = huge instanceof Blob ? huge.size : huge.byteLength; - - const out = await runInServer( - { - async fetch(req) { - try { - if (withDelay) await 1; - - expect(req.headers.get("x-custom")).toBe("hello"); - expect(req.headers.get("content-type")).toBe("text/plain"); - expect(req.headers.get("user-agent")).toBe(navigator.userAgent); - - gc(); - expect(req.headers.get("x-custom")).toBe("hello"); - expect(req.headers.get("content-type")).toBe("text/plain"); - expect(req.headers.get("user-agent")).toBe(navigator.userAgent); - - var reader = req.body.getReader(); - called = true; - var buffers = []; - while (true) { - var { done, value } = await reader.read(); - if (done) break; - buffers.push(value); + if (bytes.length > 255) fillRepeating(bytes, 0, bytes.length); + + for (const huge_ of [ + bytes, + bytes.buffer, + new DataView(bytes.buffer), + new Int8Array(bytes), + new Blob([bytes]), + new Float64Array(bytes), + + new Uint16Array(bytes), + new Uint32Array(bytes), + new Int16Array(bytes), + new Int32Array(bytes), + + // make sure we handle subarray() as expected when reading + // typed arrays from native code + new Int16Array(bytes).subarray(1), + new Int16Array(bytes).subarray(0, new Int16Array(bytes).byteLength - 1), + new Int32Array(bytes).subarray(1), + new Int32Array(bytes).subarray(0, new Int32Array(bytes).byteLength - 1), + new Int16Array(bytes).subarray(0, 1), + new Int32Array(bytes).subarray(0, 1), + new Float32Array(bytes).subarray(0, 1), + ]) { + const thisArray = huge_; + if (Number(thisArray.byteLength ?? thisArray.size) === 0) continue; + + it( + `works with ${thisArray.constructor.name}(${ + thisArray.byteLength ?? thisArray.size + }:${inputLength}) via req.body.getReader() in chunks` + + (withDelay ? " with delay" : "") + + (forceReadableStreamConversionFastPath ? " (force ReadableStream conversion)" : ""), + async () => { + var huge = thisArray; + var called = false; + + const expectedHash = + huge instanceof Blob ? Bun.SHA1.hash(await huge.bytes(), "base64") : Bun.SHA1.hash(huge, "base64"); + const expectedSize = huge instanceof Blob ? huge.size : huge.byteLength; + + const out = await runInServer( + { + async fetch(request) { + try { + if (withDelay) await 1; + + const run = async function (req) { + expect(req.headers.get("x-custom")).toBe("hello"); + expect(req.headers.get("content-type")).toBe("text/plain"); + expect(req.headers.get("user-agent")).toBe(navigator.userAgent); + + expect(req.headers.get("x-custom")).toBe("hello"); + expect(req.headers.get("content-type")).toBe("text/plain"); + expect(req.headers.get("user-agent")).toBe(navigator.userAgent); + + let reader = req.body.getReader(); + called = true; + let buffers = []; + while (true) { + let { done, value } = await reader.read(); + if (done) break; + buffers.push(value); + } + let out = new Blob(buffers); + expect(out.size).toBe(expectedSize); + expect(Bun.SHA1.hash(await out.arrayBuffer(), "base64")).toBe(expectedHash); + expect(req.headers.get("x-custom")).toBe("hello"); + expect(req.headers.get("content-type")).toBe("text/plain"); + expect(req.headers.get("user-agent")).toBe(navigator.userAgent); + return out; + }; + + let out; + for (let req of doClone ? [request.clone(), request] : [request]) { + out = await run(req); + } + + return new Response(out, { + headers: request.headers, + }); + } catch (e) { + console.error(e); + throw e; } - const out = new Blob(buffers); - gc(); - expect(out.size).toBe(expectedSize); - expect(Bun.SHA1.hash(await out.arrayBuffer(), "base64")).toBe(expectedHash); - expect(req.headers.get("x-custom")).toBe("hello"); - expect(req.headers.get("content-type")).toBe("text/plain"); - expect(req.headers.get("user-agent")).toBe(navigator.userAgent); - gc(); - return new Response(out, { - headers: req.headers, - }); - } catch (e) { - console.error(e); - throw e; - } - }, - }, - async url => { - gc(); - if (withDelay) await 1; - const pendingResponse = await fetch(url, { - body: huge, - method: "POST", - headers: { - "content-type": "text/plain", - "x-custom": "hello", - "x-typed-array": thisArray.constructor.name, }, - }); - if (withDelay) { - await 1; - } - const response = await pendingResponse; - if (forceReadableStreamConversionFastPath) { - response.body; - } - huge = undefined; - expect(response.status).toBe(200); - const response_body = await response.bytes(); + }, + async url => { + if (withDelay) await 1; + const pendingResponse = await fetch(url, { + body: huge, + method: "POST", + headers: { + "content-type": "text/plain", + "x-custom": "hello", + "x-typed-array": thisArray.constructor.name, + }, + }); + if (withDelay) { + await 1; + } + let response = await pendingResponse; + if (forceReadableStreamConversionFastPath) { + response.body; + } + let originalResponse = response; + if (doClone) { + response = response.clone(); + } + huge = undefined; + expect(response.status).toBe(200); + for (let body of doClone ? [response, originalResponse] : [response]) { + const response_body = await body.bytes(); + expect(response_body.byteLength).toBe(expectedSize); + expect(Bun.SHA1.hash(response_body, "base64")).toBe(expectedHash); + } - expect(response_body.byteLength).toBe(expectedSize); - expect(Bun.SHA1.hash(response_body, "base64")).toBe(expectedHash); + expect(response.headers.get("content-type")).toBe("text/plain"); + }, + ); + expect(called).toBe(true); - gc(); - expect(response.headers.get("content-type")).toBe("text/plain"); - gc(); - }, - ); - expect(called).toBe(true); - gc(); - return out; - }, - ); - - for (let isDirectStream of [true, false]) { - const positions = ["begin", "end"]; - const inner = thisArray => { - for (let position of positions) { - it( - `streaming back ${thisArray.constructor.name}(${ - thisArray.byteLength ?? thisArray.size - }:${inputLength}) starting request.body.getReader() at ${position}` + - (forceReadableStreamConversionFastPath ? " (force ReadableStream conversion)" : ""), - async () => { - var huge = thisArray; - var called = false; - gc(); - - const expectedHash = - huge instanceof Blob - ? Bun.SHA1.hash(await huge.bytes(), "base64") - : Bun.SHA1.hash(huge, "base64"); - const expectedSize = huge instanceof Blob ? huge.size : huge.byteLength; - - const out = await runInServer( - { - async fetch(req) { - try { - var reader; - - if (withDelay) await 1; - - if (position === "begin") { - reader = req.body.getReader(); - } + return out; + }, + ); - if (position === "end") { - await 1; - reader = req.body.getReader(); - } + for (let isDirectStream of [true, false]) { + const positions = ["begin", "end"]; + const inner = thisArray => { + for (let position of positions) { + it( + `streaming back ${thisArray.constructor.name}(${ + thisArray.byteLength ?? thisArray.size + }:${inputLength}) starting request.body.getReader() at ${position}` + + (forceReadableStreamConversionFastPath ? " (force ReadableStream conversion)" : ""), + async () => { + var huge = thisArray; + var called = false; + + const expectedHash = + huge instanceof Blob + ? Bun.SHA1.hash(await huge.bytes(), "base64") + : Bun.SHA1.hash(huge, "base64"); + const expectedSize = huge instanceof Blob ? huge.size : huge.byteLength; + + const out = await runInServer( + { + async fetch(request) { + try { + var reader; + + if (withDelay) await 1; + + for (let req of doClone ? [request.clone(), request] : [request]) { + if (position === "begin") { + reader = req.body.getReader(); + } - expect(req.headers.get("x-custom")).toBe("hello"); - expect(req.headers.get("content-type")).toBe("text/plain"); - expect(req.headers.get("user-agent")).toBe(navigator.userAgent); + if (position === "end") { + await 1; + reader = req.body.getReader(); + } - gc(); - expect(req.headers.get("x-custom")).toBe("hello"); - expect(req.headers.get("content-type")).toBe("text/plain"); - expect(req.headers.get("user-agent")).toBe(navigator.userAgent); + expect(req.headers.get("x-custom")).toBe("hello"); + expect(req.headers.get("content-type")).toBe("text/plain"); + expect(req.headers.get("user-agent")).toBe(navigator.userAgent); + } + + const direct = { + type: "direct", + async pull(controller) { + if (withDelay) await 1; + + while (true) { + const { done, value } = await reader.read(); + if (done) { + called = true; + controller.end(); + + return; + } + controller.write(value); + } + }, + }; + + const web = { + async start() { + if (withDelay) await 1; + }, + async pull(controller) { + while (true) { + const { done, value } = await reader.read(); + if (done) { + called = true; + controller.close(); + return; + } + controller.enqueue(value); + } + }, + }; + + return new Response(new ReadableStream(isDirectStream ? direct : web), { + headers: request.headers, + }); + } catch (e) { + console.error(e); + throw e; + } + }, + }, + async url => { + let response = await fetch(url, { + body: huge, + method: "POST", + headers: { + "content-type": "text/plain", + "x-custom": "hello", + "x-typed-array": thisArray.constructor.name, + }, + }); + huge = undefined; + expect(response.status).toBe(200); + if (forceReadableStreamConversionFastPath) { + response.body; + } - const direct = { - type: "direct", - async pull(controller) { - if (withDelay) await 1; + const originalResponse = response; + if (doClone) { + response = response.clone(); + } - while (true) { - const { done, value } = await reader.read(); - if (done) { - called = true; - controller.end(); + for (let body of doClone ? [response, originalResponse] : [response]) { + const response_body = await body.bytes(); + expect(response_body.byteLength).toBe(expectedSize); + expect(Bun.SHA1.hash(response_body, "base64")).toBe(expectedHash); + } - return; - } - controller.write(value); - } - }, - }; - - const web = { - async start() { - if (withDelay) await 1; - }, - async pull(controller) { - while (true) { - const { done, value } = await reader.read(); - if (done) { - called = true; - controller.close(); - return; - } - controller.enqueue(value); - } - }, - }; - - return new Response(new ReadableStream(isDirectStream ? direct : web), { - headers: req.headers, - }); - } catch (e) { - console.error(e); - throw e; + if (!response.headers.has("content-type")) { + console.error(Object.fromEntries(response.headers.entries())); } - }, - }, - async url => { - gc(); - const response = await fetch(url, { - body: huge, - method: "POST", - headers: { - "content-type": "text/plain", - "x-custom": "hello", - "x-typed-array": thisArray.constructor.name, - }, - }); - huge = undefined; - expect(response.status).toBe(200); - if (forceReadableStreamConversionFastPath) { - response.body; - } - const response_body = await response.bytes(); - expect(response_body.byteLength).toBe(expectedSize); - expect(Bun.SHA1.hash(response_body, "base64")).toBe(expectedHash); + expect(response.headers.get("content-type")).toBe("text/plain"); + }, + ); + expect(called).toBe(true); - gc(); - if (!response.headers.has("content-type")) { - console.error(Object.fromEntries(response.headers.entries())); - } + return out; + }, + ); + } + }; - expect(response.headers.get("content-type")).toBe("text/plain"); - gc(); - }, - ); - expect(called).toBe(true); - gc(); - return out; - }, - ); + if (isDirectStream) { + describe(" direct stream", () => inner(thisArray)); + } else { + describe("default stream", () => inner(thisArray)); } - }; - - if (isDirectStream) { - describe(" direct stream", () => inner(thisArray)); - } else { - describe("default stream", () => inner(thisArray)); } } } + } catch (e) { + console.error(e); + throw e; } - } catch (e) { - console.error(e); - throw e; } } - } -}); + }); +} diff --git a/test/js/web/fetch/stream-fast-path.test.ts b/test/js/web/fetch/stream-fast-path.test.ts index 7e856a6ef21fc..82af1d93dfc71 100644 --- a/test/js/web/fetch/stream-fast-path.test.ts +++ b/test/js/web/fetch/stream-fast-path.test.ts @@ -28,8 +28,15 @@ describe("ByteBlobLoader", () => { test("works", async () => { const stream = blob.stream(); const result = fn(stream); - console.log(Promise, result); - expect(result.then).toBeFunction(); + + // TODO: figure out why empty is wasting a microtask. + if (blob.size > 0) { + // Don't waste microticks on this. + if (result instanceof Promise) { + expect(Bun.peek.status(result)).toBe("fulfilled"); + } + } + const awaited = await result; expect(awaited).toEqual(await new Response(blob)[name]()); });