Skip to content

Commit

Permalink
Merge pull request #3231 from cloudflare/jphillips/writable-stream-me…
Browse files Browse the repository at this point in the history
…mory-opt-2

Moar WritableStream memory optimization
  • Loading branch information
jp4a50 authored Dec 11, 2024
2 parents 7518bb8 + 8bbcc9b commit 7fcd9e5
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 27 deletions.
4 changes: 2 additions & 2 deletions src/workerd/api/streams/common.c++
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ void WritableStreamController::PendingAbort::fail(jsg::Lock& js, v8::Local<v8::V
maybeRejectPromise<void>(js, resolver, reason);
}

kj::Maybe<WritableStreamController::PendingAbort> WritableStreamController::PendingAbort::dequeue(
kj::Maybe<WritableStreamController::PendingAbort>& maybePendingAbort) {
kj::Maybe<kj::Own<WritableStreamController::PendingAbort>> WritableStreamController::PendingAbort::
dequeue(kj::Maybe<kj::Own<WritableStreamController::PendingAbort>>& maybePendingAbort) {
return kj::mv(maybePendingAbort);
}

Expand Down
3 changes: 2 additions & 1 deletion src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,8 @@ class WritableStreamController {
visitor.visit(resolver, promise, reason);
}

static kj::Maybe<PendingAbort> dequeue(kj::Maybe<PendingAbort>& maybePendingAbort);
static kj::Maybe<kj::Own<PendingAbort>> dequeue(
kj::Maybe<kj::Own<PendingAbort>>& maybePendingAbort);

JSG_MEMORY_INFO(PendingAbort) {
tracker.trackField("resolver", resolver);
Expand Down
22 changes: 12 additions & 10 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1126,8 +1126,8 @@ jsg::Promise<void> WritableStreamInternalController::doAbort(
// If there is already an abort pending, return that pending promise
// instead of trying to schedule another.
KJ_IF_SOME(pendingAbort, maybePendingAbort) {
pendingAbort.reject = options.reject;
auto promise = pendingAbort.whenResolved(js);
pendingAbort->reject = options.reject;
auto promise = pendingAbort->whenResolved(js);
if (options.handled) {
promise.markAsHandled(js);
}
Expand Down Expand Up @@ -1156,8 +1156,8 @@ jsg::Promise<void> WritableStreamInternalController::doAbort(
: js.resolvedPromise();
}

maybePendingAbort = PendingAbort(js, reason, options.reject);
auto promise = KJ_ASSERT_NONNULL(maybePendingAbort).whenResolved(js);
maybePendingAbort = kj::heap<PendingAbort>(js, reason, options.reject);
auto promise = KJ_ASSERT_NONNULL(maybePendingAbort)->whenResolved(js);
if (options.handled) {
promise.markAsHandled(js);
}
Expand Down Expand Up @@ -1473,7 +1473,7 @@ jsg::Promise<void> WritableStreamInternalController::writeLoop(

void WritableStreamInternalController::finishClose(jsg::Lock& js) {
KJ_IF_SOME(pendingAbort, PendingAbort::dequeue(maybePendingAbort)) {
pendingAbort.complete(js);
pendingAbort->complete(js);
}

doClose(js);
Expand All @@ -1484,7 +1484,7 @@ void WritableStreamInternalController::finishError(jsg::Lock& js, v8::Local<v8::
// In this case, and only this case, we ignore any pending rejection
// that may be stored in the pendingAbort. The current exception takes
// precedence.
pendingAbort.fail(js, reason);
pendingAbort->fail(js, reason);
}

doError(js, reason);
Expand Down Expand Up @@ -1551,10 +1551,10 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
const auto maybeAbort = [this](jsg::Lock& js, auto& request) -> bool {
auto& writable = KJ_ASSERT_NONNULL(state.tryGet<IoOwn<Writable>>());
KJ_IF_SOME(pendingAbort, WritableStreamController::PendingAbort::dequeue(maybePendingAbort)) {
auto ex = js.exceptionToKj(pendingAbort.reason.addRef(js));
auto ex = js.exceptionToKj(pendingAbort->reason.addRef(js));
writable->abort(kj::mv(ex));
drain(js, pendingAbort.reason.getHandle(js));
pendingAbort.complete(js);
drain(js, pendingAbort->reason.getHandle(js));
pendingAbort->complete(js);
return true;
}
return false;
Expand Down Expand Up @@ -2017,7 +2017,9 @@ void WritableStreamInternalController::visitForGc(jsg::GcVisitor& visitor) {
KJ_IF_SOME(locked, writeState.tryGet<WriterLocked>()) {
visitor.visit(locked);
}
visitor.visit(maybePendingAbort);
KJ_IF_SOME(pendingAbort, maybePendingAbort) {
visitor.visit(*pendingAbort);
}
}

void ReadableStreamInternalController::visitForGc(jsg::GcVisitor& visitor) {
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class WritableStreamInternalController: public WritableStreamController {

kj::Maybe<kj::Own<ByteStreamObserver>> observer;

kj::Maybe<PendingAbort> maybePendingAbort;
kj::Maybe<kj::Own<PendingAbort>> maybePendingAbort;

uint64_t currentWriteBufferSize = 0;
bool warnAboutExcessiveBackpressure = true;
Expand Down
27 changes: 15 additions & 12 deletions src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ jsg::Promise<void> WritableImpl<Self>::abort(
KJ_IF_SOME(pendingAbort, maybePendingAbort) {
// Notice here that, per the spec, the reason given in this call of abort is
// intentionally ignored if there is already an abort pending.
return pendingAbort.whenResolved(js);
return pendingAbort->whenResolved(js);
}

bool wasAlreadyErroring = false;
Expand All @@ -1164,8 +1164,8 @@ jsg::Promise<void> WritableImpl<Self>::abort(

KJ_DEFER(if (!wasAlreadyErroring) { startErroring(js, kj::mv(self), reason); });

maybePendingAbort = PendingAbort(js, reason, wasAlreadyErroring);
return KJ_ASSERT_NONNULL(maybePendingAbort).whenResolved(js);
maybePendingAbort = kj::heap<PendingAbort>(js, reason, wasAlreadyErroring);
return KJ_ASSERT_NONNULL(maybePendingAbort)->whenResolved(js);
}

template <typename Self>
Expand Down Expand Up @@ -1328,22 +1328,22 @@ void WritableImpl<Self>::finishErroring(jsg::Lock& js, jsg::Ref<Self> self) {
KJ_ASSERT(writeRequests.empty());

KJ_IF_SOME(pendingAbort, maybePendingAbort) {
if (pendingAbort.reject) {
pendingAbort.fail(js, reason);
if (pendingAbort->reject) {
pendingAbort->fail(js, reason);
return rejectCloseAndClosedPromiseIfNeeded(js);
}

auto onSuccess = JSG_VISITABLE_LAMBDA((this, self = self.addRef()), (self), (jsg::Lock& js) {
auto& pendingAbort = KJ_ASSERT_NONNULL(maybePendingAbort);
pendingAbort.reject = false;
pendingAbort.complete(js);
pendingAbort->reject = false;
pendingAbort->complete(js);
rejectCloseAndClosedPromiseIfNeeded(js);
});

auto onFailure = JSG_VISITABLE_LAMBDA(
(this, self = self.addRef()), (self), (jsg::Lock& js, jsg::Value reason) {
auto& pendingAbort = KJ_ASSERT_NONNULL(maybePendingAbort);
pendingAbort.fail(js, reason.getHandle(js));
pendingAbort->fail(js, reason.getHandle(js));
rejectCloseAndClosedPromiseIfNeeded(js);
});

Expand All @@ -1364,7 +1364,7 @@ void WritableImpl<Self>::finishInFlightClose(
maybeRejectPromise<void>(js, inFlightClose, reason);

KJ_IF_SOME(pendingAbort, PendingAbort::dequeue(maybePendingAbort)) {
pendingAbort.fail(js, reason);
pendingAbort->fail(js, reason);
}

return dealWithRejection(js, kj::mv(self), reason);
Expand All @@ -1374,8 +1374,8 @@ void WritableImpl<Self>::finishInFlightClose(

if (state.template is<StreamStates::Erroring>()) {
KJ_IF_SOME(pendingAbort, PendingAbort::dequeue(maybePendingAbort)) {
pendingAbort.reject = false;
pendingAbort.complete(js);
pendingAbort->reject = false;
pendingAbort->complete(js);
}
}
KJ_ASSERT(maybePendingAbort == kj::none);
Expand Down Expand Up @@ -1571,7 +1571,10 @@ void WritableImpl<Self>::visitForGc(jsg::GcVisitor& visitor) {
visitor.visit(erroring.reason);
}
}
visitor.visit(inFlightWrite, inFlightClose, closeRequest, algorithms, signal, maybePendingAbort);
visitor.visit(inFlightWrite, inFlightClose, closeRequest, algorithms, signal);
KJ_IF_SOME(pendingAbort, maybePendingAbort) {
visitor.visit(*pendingAbort);
}
visitor.visitAll(writeRequests);
}

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/streams/standard.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ class WritableImpl {
kj::Maybe<WriteRequest> inFlightWrite;
kj::Maybe<jsg::Promise<void>::Resolver> inFlightClose;
kj::Maybe<jsg::Promise<void>::Resolver> closeRequest;
kj::Maybe<PendingAbort> maybePendingAbort;
kj::Maybe<kj::Own<PendingAbort>> maybePendingAbort;

friend Self;
};
Expand Down

0 comments on commit 7fcd9e5

Please sign in to comment.