From 77aa957b6e8acb6ac80c06790d7ed88f067fd70a Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 19 Sep 2023 07:03:28 -0700 Subject: [PATCH] More optimization of internal.c++ AllReader (#1167) --- src/workerd/api/streams/internal-test.c++ | 189 ++++++++++++++++++++++ src/workerd/api/streams/internal.c++ | 124 ++++++++++++-- 2 files changed, 296 insertions(+), 17 deletions(-) create mode 100644 src/workerd/api/streams/internal-test.c++ diff --git a/src/workerd/api/streams/internal-test.c++ b/src/workerd/api/streams/internal-test.c++ new file mode 100644 index 00000000000..267680566c5 --- /dev/null +++ b/src/workerd/api/streams/internal-test.c++ @@ -0,0 +1,189 @@ +// Copyright (c) 2017-2022 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "internal.h" +#include "readable.h" +#include "writable.h" +#include +#include +#include + +namespace workerd::api { +namespace { + +template +class FooStream : public ReadableStreamSource { +public: + FooStream() : ptr(&data[0]), remaining_(size) { + KJ_ASSERT(RAND_bytes(data, size) == 1); + } + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) { + maxMaxBytesSeen_ = kj::max(maxMaxBytesSeen_, maxBytes); + numreads_++; + if (remaining_ == 0) return (size_t)0; + KJ_ASSERT(minBytes == maxBytes); + KJ_ASSERT(maxBytes <= size); + auto amount = kj::min(remaining_, maxBytes); + memcpy(buffer, ptr, amount); + ptr += amount; + remaining_ -= amount; + return amount; + } + + kj::ArrayPtr buf() { return data; } + + size_t remaining() { return remaining_; } + + size_t numreads() { return numreads_; } + + size_t maxMaxBytesSeen() { return maxMaxBytesSeen_; } + +private: + uint8_t data[size]; + uint8_t* ptr; + size_t remaining_; + size_t numreads_ = 0; + size_t maxMaxBytesSeen_ = 0; +}; + +template +class BarStream : public FooStream { +public: + kj::Maybe tryGetLength(StreamEncoding encoding) { + return size; + } +}; + +KJ_TEST("test") { + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + // In this first case, the stream does not report a length. The maximum + // number of reads should be 3, and each allocation should be 4096 + FooStream<10000> stream; + + stream.readAllBytes(10001).then([&](auto bytes) { + KJ_ASSERT(bytes.size() == 10000); + KJ_ASSERT(memcmp(bytes.begin(), stream.buf().begin(), 10000) == 0); + }).wait(waitScope); + + KJ_ASSERT(stream.numreads() == 3); + KJ_ASSERT(stream.maxMaxBytesSeen() == 4096); +} + +KJ_TEST("test (text)") { + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + // In this first case, the stream does not report a length. The maximum + // number of reads should be 3, and each allocation should be 4096 + FooStream<10000> stream; + + stream.readAllText(10001).then([&](auto bytes) { + KJ_ASSERT(bytes.size() == 10000); + KJ_ASSERT(memcmp(bytes.begin(), stream.buf().begin(), 10000) == 0); + }).wait(waitScope); + + KJ_ASSERT(stream.numreads() == 3); + KJ_ASSERT(stream.maxMaxBytesSeen() == 4096); +} + +KJ_TEST("test2") { + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + // In this second case, the stream does report a size, so we should see + // only one read. + BarStream<10000> stream; + + stream.readAllBytes(10001).then([&](auto bytes) { + KJ_ASSERT(bytes.size() == 10000); + KJ_ASSERT(memcmp(bytes.begin(), stream.buf().begin(), 10000) == 0); + }).wait(waitScope); + + KJ_ASSERT(stream.numreads() == 2); + KJ_ASSERT(stream.maxMaxBytesSeen() == 10000); +} + +KJ_TEST("test2 (text)") { + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + // In this second case, the stream does report a size, so we should see + // only one read. + BarStream<10000> stream; + + stream.readAllText(10001).then([&](auto bytes) { + KJ_ASSERT(bytes.size() == 10000); + KJ_ASSERT(memcmp(bytes.begin(), stream.buf().begin(), 10000) == 0); + }).wait(waitScope); + + KJ_ASSERT(stream.numreads() == 2); + KJ_ASSERT(stream.maxMaxBytesSeen() == 10000); +} + +KJ_TEST("zero-length stream") { + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + class Zero : public ReadableStreamSource { + public: + kj::Promise tryRead(void*, size_t, size_t) { + return (size_t)0; + } + kj::Maybe tryGetLength(StreamEncoding encoding) { + return (size_t)0; + } + }; + + Zero zero; + zero.readAllBytes(10).then([&](kj::Array bytes) { + KJ_ASSERT(bytes.size() == 0); + }).wait(waitScope); +} + +KJ_TEST("lying stream") { + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + class Dishonest : public FooStream<10000> { + public: + kj::Maybe tryGetLength(StreamEncoding encoding) { + return (size_t)10; + } + }; + + Dishonest stream; + stream.readAllBytes(10001).then([&](kj::Array bytes) { + // The stream lies! it says there are only 10 bytes but there are more. + // oh well, we at least make sure we get the right result. + KJ_ASSERT(bytes.size() == 10000); + }).wait(waitScope); + + KJ_ASSERT(stream.numreads() == 1001); + KJ_ASSERT(stream.maxMaxBytesSeen() == 10); +} + +KJ_TEST("honest small stream") { + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + class HonestSmall : public FooStream<100> { + public: + kj::Maybe tryGetLength(StreamEncoding encoding) { + return (size_t)100; + } + }; + + HonestSmall stream; + stream.readAllBytes(1001).then([&](kj::Array bytes) { + KJ_ASSERT(bytes.size() == 100); + }).wait(waitScope); + + KJ_ASSERT(stream.numreads() == 2); + KJ_ASSERT(stream.maxMaxBytesSeen(), 100); +} + +} // namespace +} // namespace workerd::api diff --git a/src/workerd/api/streams/internal.c++ b/src/workerd/api/streams/internal.c++ index 96b8e8252ad..88bfb229a2a 100644 --- a/src/workerd/api/streams/internal.c++ +++ b/src/workerd/api/streams/internal.c++ @@ -78,35 +78,125 @@ private: template kj::Promise> read(ReadOption option = ReadOption::NONE) { + // There are a few complexities in this operation that make it difficult to completely + // optimize. The most important is that even if a stream reports an expected length + // using tryGetLength, we really don't know how much data the stream will produce until + // we try to read it. The only signal we have that the stream is done producing data + // is a zero-length result from tryRead. Unfortuntately, we have to allocate a buffer + // in advance of calling tryRead so we have to guess a bit at the size of the buffer + // to allocate. + // + // In the previous implementation of this method, we would just blindly allocate a + // 4096 byte buffer on every allocation, limiting each read iteration to a maximum + // of 4096 bytes. This works fine for streams producing a small amount of data but + // risks requiring a greater number of loop iterations and small allocations for streams + // that produce larger amounts of data. Also in the previous implementation, every + // loop iteration would allocate a new buffer regardless of how much of the previous + // allocation was actually used -- so a stream that produces only 4000 bytes total + // but only provides 10 bytes per iteration would end up with 400 reads and 400 4096 + // byte allocations. Doh! Fortunately our stream implementations tend to be a bit + // smarter than that but it's still a worst case possibility that it's likely better + // to avoid. + // + // So this implementation does things a bit differently. + // First, we check to see if the stream can give an estimate on how much data it + // expects to produce. If that length is within a given threshold, then best case + // is we can perform the entire read with at most two allocations and two calls to + // tryRead. The first allocation will be for the entire expected size of the stream, + // which the first tryRead will attempt to fulfill completely. In the best case the + // stream provides all of the data. The next allocation would be smaller and would + // end up resulting in a zero-length read signaling that we are done. Hooray! + // + // Not everything can be best case scenario tho, unfortunately. If our first tryRead + // does not fully consume the stream or fully fill the desination buffer, we're + // going to need to try again. It is possible that the new allocation in the next + // iteration will be wasted if the stream doesn't have any more data so it's important + // for us to try to be conservative with the allocation. If the running total of data + // we've seen so far is equal to or greater than the expected total length of the stream, + // then the most likely case is that the next read will be zero-length -- but unfortunately + // we can't know for sure! So for this we will fall back to a more conservative allocation + // which is either 4096 bytes or the calculated amountToRead, whichever is the lower number. + kj::Vector> parts; uint64_t runningTotal = 0; - static constexpr size_t DEFAULT_BUFFER_CHUNK = 4096; - static constexpr size_t MAX_BUFFER_CHUNK = DEFAULT_BUFFER_CHUNK * 4; + static constexpr uint64_t MIN_BUFFER_CHUNK = 1024; + static constexpr uint64_t DEFAULT_BUFFER_CHUNK = 4096; + static constexpr uint64_t MAX_BUFFER_CHUNK = DEFAULT_BUFFER_CHUNK * 4; // If we know in advance how much data we'll be reading, then we can attempt to // optimize the loop here by setting the value specifically so we are only - // allocating once. But, to be safe, let's enforce an upper bound on each allocation - // even if we do know the total. - size_t amountToRead = kj::min(MAX_BUFFER_CHUNK, - input.tryGetLength(StreamEncoding::IDENTITY).orDefault(DEFAULT_BUFFER_CHUNK)); - + // allocating at most twice. But, to be safe, let's enforce an upper bound on each + // allocation even if we do know the total. + kj::Maybe maybeLength = input.tryGetLength(StreamEncoding::IDENTITY); + + // The amountToRead is the regular allocation size we'll use right up until we've + // read the number of expected bytes (if known). This number is calculated as the + // minimum of (limit, MAX_BUFFER_CHUNK, maybeLength or DEFAULT_BUFFER_CHUNK). In + // the best case scenario, this number is calculated such that we can read the + // entire stream in one go if the amount of data is small enough and the stream + // is well behaved. + // If the stream does report a length, once we've read that number of bytes, we'll + // fallback to the conservativeAllocation. + uint64_t amountToRead = kj::min(limit, + kj::min(MAX_BUFFER_CHUNK, + maybeLength.orDefault(DEFAULT_BUFFER_CHUNK))); + // amountToRead can be zero if the stream reported a zero-length. While the stream could + // be lying about it's length, let's skip reading anything in this case. if (amountToRead > 0) { for (;;) { - // TODO(perf): We can likely further optimize this loop by checking to see - // how much of the buffer was filled and using the remaining buffer space if - // it is not completely filled by the previous iteration. Doing so makes this - // loop a bit more complicated tho, so for now let's keep things simple. auto bytes = kj::heapArray(amountToRead); - size_t amount = co_await input.tryRead(bytes.begin(), 1, bytes.size()); + // Note that we're passing amountToRead as the *minBytes* here so the tryRead should + // attempt to fill the entire buffer. If it doesn't, the implication is that we read + // everything. + uint64_t amount = co_await input.tryRead(bytes.begin(), amountToRead, amountToRead); + KJ_DASSERT(amount <= amountToRead); - if (amount == 0) { + runningTotal += amount; + JSG_REQUIRE(runningTotal < limit, TypeError, "Memory limit exceeded before EOF."); + + if (amount < amountToRead) { + // The stream has indicated that we're all done by returning a value less than the + // full buffer length. + // It is possible/likely that at least some amount of data was written to the buffer. + // In which case we want to add that subset to the parts list here before we exit + // the loop. + if (amount > 0) { + parts.add(bytes.slice(0, amount).attach(kj::mv(bytes))); + } break; } - runningTotal += amount; - JSG_REQUIRE(runningTotal < limit, TypeError, "Memory limit exceeded before EOF."); - parts.add(bytes.slice(0, amount).attach(kj::mv(bytes))); - }; + // Because we specify minSize equal to maxSize in the tryRead above, we should only + // get here if the buffer was completely filled by the read. If it wasn't completely + // filled, that is an indication that the stream is complete which is handled above. + KJ_DASSERT(amount == bytes.size()); + parts.add(kj::mv(bytes)); + + // If the stream provided an expected length and our running total is equal to + // or greater than that length then we assume we're done. + KJ_IF_SOME(length, maybeLength) { + if (runningTotal >= length) { + // We've read everything we expect to read but some streams need to be read + // completely in order to properly finish and other streams might lie (although + // they shouldn't). Sigh. So we're going to make the next allocation potentially + // smaller and keep reading until we get a zero length. In the best case, the next + // read is going to be zero length but we have to try which will require at least + // one additional (potentially wasted) allocation. (If we don't there are multiple + // test failures). + amountToRead = kj::min(MIN_BUFFER_CHUNK, amountToRead); + continue; + } + } + } + } + + KJ_IF_SOME(length, maybeLength) { + if (runningTotal > length) { + // Realistically runningTotal should never be more than length so we'll emit + // a warning if it is just so we know. It would be indicative of a bug somewhere + // in the implementation. + KJ_LOG(WARNING, "ReadableStream provided more data than advertised", runningTotal, length); + } } if (option == ReadOption::NULL_TERMINATE) {