Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #13816 #13906

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/bun.js/bindings/ErrorCode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export default [
["ERR_ILLEGAL_CONSTRUCTOR", TypeError, "TypeError"],
["ERR_INVALID_URL", TypeError, "TypeError"],
["ERR_BUFFER_TOO_LARGE", RangeError, "RangeError"],
["ERR_STREAM_RELEASE_LOCK", Error, "AbortError"],
["ERR_BROTLI_INVALID_PARAM", RangeError, "RangeError"],
["ERR_UNKNOWN_ENCODING", TypeError, "TypeError"],
["ERR_INVALID_STATE", Error, "Error"],
Expand Down
25 changes: 13 additions & 12 deletions src/js/builtins/ProcessObjectInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ export function getStdioWriteStream(fd) {
}

export function getStdinStream(fd) {
const tty = require("node:tty");
const fs = require("node:fs");
if (!tty.isatty(fd) && fs.fstatSync(fd).isFile()) {
return new fs.ReadStream(null, { fd: fd, autoClose: false });
}

// Ideally we could use this:
// return require("node:stream")[Symbol.for("::bunternal::")]._ReadableFromWeb(Bun.stdin.stream());
// but we need to extend TTY/FS ReadStream
Expand Down Expand Up @@ -106,21 +112,12 @@ export function getStdinStream(fd) {
$getByIdDirectPrivate(native, "readableStreamController"),
"underlyingByteSource",
).$resume(false);
} catch (e) {
if (IS_BUN_DEVELOPMENT) {
// we assume this isn't possible, but because we aren't sure
// we will ignore if error during release, but make a big deal in debug
console.error(e);
$assert(!"reachable");
}
}
} catch (e) {}
}
}
}

const tty = require("node:tty");
const ReadStream = tty.isatty(fd) ? tty.ReadStream : require("node:fs").ReadStream;
const stream = new ReadStream(fd);
const stream = new tty.ReadStream(fd);

const originalOn = stream.on;

Expand Down Expand Up @@ -194,7 +191,11 @@ export function getStdinStream(fd) {
unref();
}
}
} catch (err) {
} catch (err: any) {
if (err?.code === "ERR_STREAM_RELEASE_LOCK") {
// Not a bug. Happens in unref().
return;
}
stream.destroy(err);
}
}
Expand Down
17 changes: 16 additions & 1 deletion src/js/builtins/ReadableStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export function initializeReadableStream(

$linkTimeConstant;
export function readableStreamToArray(stream: ReadableStream): Promise<unknown[]> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
if (underlyingSource !== undefined) {
Expand All @@ -119,6 +120,7 @@ export function readableStreamToArray(stream: ReadableStream): Promise<unknown[]

$linkTimeConstant;
export function readableStreamToText(stream: ReadableStream): Promise<string> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
if (underlyingSource !== undefined) {
Expand All @@ -137,6 +139,7 @@ export function readableStreamToText(stream: ReadableStream): Promise<string> {

$linkTimeConstant;
export function readableStreamToArrayBuffer(stream: ReadableStream<ArrayBuffer>): Promise<ArrayBuffer> | ArrayBuffer {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
if (underlyingSource !== undefined) {
Expand Down Expand Up @@ -216,6 +219,7 @@ export function readableStreamToArrayBuffer(stream: ReadableStream<ArrayBuffer>)

$linkTimeConstant;
export function readableStreamToBytes(stream: ReadableStream<ArrayBuffer>): Promise<Uint8Array> | Uint8Array {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
// this is a direct stream
var underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");

Expand Down Expand Up @@ -297,6 +301,7 @@ export function readableStreamToFormData(
stream: ReadableStream<ArrayBuffer>,
contentType: string | ArrayBuffer | ArrayBufferView,
): Promise<FormData> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked"));
return Bun.readableStreamToBlob(stream).then(blob => {
return FormData.from(blob, contentType);
Expand All @@ -305,6 +310,7 @@ export function readableStreamToFormData(

$linkTimeConstant;
export function readableStreamToJSON(stream: ReadableStream): unknown {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked"));
let result = $tryUseReadableStreamBufferedFastPath(stream, "json");
if (result) {
Expand All @@ -326,6 +332,7 @@ export function readableStreamToJSON(stream: ReadableStream): unknown {

$linkTimeConstant;
export function readableStreamToBlob(stream: ReadableStream): Promise<Blob> {
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
if ($isReadableStreamLocked(stream)) return Promise.$reject($makeTypeError("ReadableStream is locked"));

return (
Expand Down Expand Up @@ -422,7 +429,15 @@ export function pipeThrough(this, streams, options) {

if ($isWritableStreamLocked(internalWritable)) throw $makeTypeError("WritableStream is locked");

$readableStreamPipeToWritableStream(this, internalWritable, preventClose, preventAbort, preventCancel, signal);
const promise = $readableStreamPipeToWritableStream(
this,
internalWritable,
preventClose,
preventAbort,
preventCancel,
signal,
);
$markPromiseAsHandled(promise);

return readable;
}
Expand Down
5 changes: 3 additions & 2 deletions src/js/builtins/ReadableStreamDefaultController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ export function initializeReadableStreamDefaultController(this, stream, underlyi
export function enqueue(this, chunk) {
if (!$isReadableStreamDefaultController(this)) throw $makeThisTypeError("ReadableStreamDefaultController", "enqueue");

if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this))
throw new TypeError("ReadableStreamDefaultController is not in a state where chunk can be enqueued");
if (!$readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
throw $ERR_INVALID_STATE("ReadableStreamDefaultController is not in a state where chunk can be enqueued");
}

return $readableStreamDefaultControllerEnqueue(this, chunk);
}
Expand Down
5 changes: 1 addition & 4 deletions src/js/builtins/ReadableStreamDefaultReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ export function releaseLock(this) {

if (!$getByIdDirectPrivate(this, "ownerReadableStream")) return;

if ($getByIdDirectPrivate(this, "readRequests")?.isNotEmpty())
throw new TypeError("There are still pending read requests, cannot release the lock");

$readableStreamReaderGenericRelease(this);
$readableStreamDefaultReaderRelease(this);
}

$getter;
Expand Down
38 changes: 27 additions & 11 deletions src/js/builtins/ReadableStreamInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,10 @@ export function pipeToDoReadWrite(pipeState) {
pipeState.pendingReadPromiseCapability.resolve.$call(undefined, canWrite);
if (!canWrite) return;

pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value);
pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value).$then(
undefined,
() => {},
);
},
e => {
pipeState.pendingReadPromiseCapability.resolve.$call(undefined, false);
Expand Down Expand Up @@ -396,7 +399,7 @@ export function pipeToClosingMustBePropagatedForward(pipeState) {
action();
return;
}
$getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").promise.$then(action, undefined);
$getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").promise.$then(action, () => {});
}

export function pipeToClosingMustBePropagatedBackward(pipeState) {
Expand Down Expand Up @@ -1367,20 +1370,18 @@ export function readableStreamError(stream, error) {

if (!reader) return;

$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(undefined, error);
const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
$markPromiseAsHandled(promise);

if ($isReadableStreamDefaultReader(reader)) {
const requests = $getByIdDirectPrivate(reader, "readRequests");
$putByIdDirectPrivate(reader, "readRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
$readableStreamDefaultReaderErrorReadRequests(reader, error);
} else {
$assert($isReadableStreamBYOBReader(reader));
const requests = $getByIdDirectPrivate(reader, "readIntoRequests");
$putByIdDirectPrivate(reader, "readIntoRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
}

$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(undefined, error);
const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
$markPromiseAsHandled(promise);
}

export function readableStreamDefaultControllerShouldCallPull(controller) {
Expand Down Expand Up @@ -1608,6 +1609,15 @@ export function isReadableStreamDisturbed(stream) {
return stream.$disturbed;
}

$visibility = "Private";
export function readableStreamDefaultReaderRelease(reader) {
$readableStreamReaderGenericRelease(reader);
$readableStreamDefaultReaderErrorReadRequests(
reader,
$ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()"),
);
}

$visibility = "Private";
export function readableStreamReaderGenericRelease(reader) {
$assert(!!$getByIdDirectPrivate(reader, "ownerReadableStream"));
Expand All @@ -1616,11 +1626,11 @@ export function readableStreamReaderGenericRelease(reader) {
if ($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === $streamReadable)
$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(
undefined,
$makeTypeError("releasing lock of reader whose stream is still in readable state"),
$ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()"),
);
else
$putByIdDirectPrivate(reader, "closedPromiseCapability", {
promise: $newHandledRejectedPromise($makeTypeError("reader released lock")),
promise: $newHandledRejectedPromise($ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()")),
});

const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
Expand All @@ -1636,6 +1646,12 @@ export function readableStreamReaderGenericRelease(reader) {
$putByIdDirectPrivate(reader, "ownerReadableStream", undefined);
}

export function readableStreamDefaultReaderErrorReadRequests(reader, error) {
const requests = $getByIdDirectPrivate(reader, "readRequests");
$putByIdDirectPrivate(reader, "readRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
}

export function readableStreamDefaultControllerCanCloseOrEnqueue(controller) {
if ($getByIdDirectPrivate(controller, "closeRequested")) {
return false;
Expand Down
88 changes: 88 additions & 0 deletions test/js/web/streams/streams.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,55 @@ it("ReadableStream for empty file closes immediately", async () => {
expect(chunks.length).toBe(0);
});

it("ReadableStream errors the stream on pull rejection", async () => {
let stream = new ReadableStream({
pull(controller) {
return Promise.reject("pull rejected");
},
});

let reader = stream.getReader();
let closed = reader.closed.catch(err => `closed: ${err}`);
let read = reader.read().catch(err => `read: ${err}`);
expect(await Promise.race([closed, read])).toBe("closed: pull rejected");
expect(await read).toBe("read: pull rejected");
});

it("ReadableStream rejects pending reads when the lock is released", async () => {
let { resolve, promise } = Promise.withResolvers();
let stream = new ReadableStream({
async pull(controller) {
controller.enqueue("123");
await promise;
controller.enqueue("456");
controller.close();
},
});

let reader = stream.getReader();
expect((await reader.read()).value).toBe("123");

let read = reader.read();
reader.releaseLock();
expect(read).rejects.toThrow(
expect.objectContaining({
name: "AbortError",
code: "ERR_STREAM_RELEASE_LOCK",
}),
);
expect(reader.closed).rejects.toThrow(
expect.objectContaining({
name: "AbortError",
code: "ERR_STREAM_RELEASE_LOCK",
}),
);

resolve();

reader = stream.getReader();
expect((await reader.read()).value).toBe("456");
});

it("new Response(stream).arrayBuffer() (bytes)", async () => {
var queue = [Buffer.from("abdefgh")];
var stream = new ReadableStream({
Expand Down Expand Up @@ -1055,3 +1104,42 @@ it("fs.createReadStream(filename) should be able to break inside async loop", as
expect(true).toBe(true);
}
});

it("pipeTo doesn't cause unhandled rejections on readable errors", async () => {
// https://github.com/WebKit/WebKit/blob/3a75b5d2de94aa396a99b454ac47f3be9e0dc726/LayoutTests/streams/pipeTo-unhandled-promise.html
let unhandledRejectionCaught = false;

const catchUnhandledRejection = () => {
unhandledRejectionCaught = true;
};
process.on("unhandledRejection", catchUnhandledRejection);

const writable = new WritableStream();
const readable = new ReadableStream({ start: c => c.error("error") });
readable.pipeTo(writable).catch(() => {});

await Bun.sleep(15);

process.off("unhandledRejection", catchUnhandledRejection);

expect(unhandledRejectionCaught).toBe(false);
});

it("pipeThrough doesn't cause unhandled rejections on readable errors", async () => {
let unhandledRejectionCaught = false;

const catchUnhandledRejection = () => {
unhandledRejectionCaught = true;
};
process.on("unhandledRejection", catchUnhandledRejection);

const readable = new ReadableStream({ start: c => c.error("error") });
const ts = new TransformStream();
readable.pipeThrough(ts);

await Bun.sleep(15);

process.off("unhandledRejection", catchUnhandledRejection);

expect(unhandledRejectionCaught).toBe(false);
});