Skip to content

Commit 77aa957

Browse files
authored
More optimization of internal.c++ AllReader (#1167)
1 parent 9049b9e commit 77aa957

File tree

2 files changed

+296
-17
lines changed

2 files changed

+296
-17
lines changed
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
// Copyright (c) 2017-2022 Cloudflare, Inc.
2+
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
3+
// https://opensource.org/licenses/Apache-2.0
4+
5+
#include "internal.h"
6+
#include "readable.h"
7+
#include "writable.h"
8+
#include <workerd/jsg/jsg.h>
9+
#include <workerd/jsg/jsg-test.h>
10+
#include <openssl/rand.h>
11+
12+
namespace workerd::api {
13+
namespace {
14+
15+
template <int size>
16+
class FooStream : public ReadableStreamSource {
17+
public:
18+
FooStream() : ptr(&data[0]), remaining_(size) {
19+
KJ_ASSERT(RAND_bytes(data, size) == 1);
20+
}
21+
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
22+
maxMaxBytesSeen_ = kj::max(maxMaxBytesSeen_, maxBytes);
23+
numreads_++;
24+
if (remaining_ == 0) return (size_t)0;
25+
KJ_ASSERT(minBytes == maxBytes);
26+
KJ_ASSERT(maxBytes <= size);
27+
auto amount = kj::min(remaining_, maxBytes);
28+
memcpy(buffer, ptr, amount);
29+
ptr += amount;
30+
remaining_ -= amount;
31+
return amount;
32+
}
33+
34+
kj::ArrayPtr<uint8_t> buf() { return data; }
35+
36+
size_t remaining() { return remaining_; }
37+
38+
size_t numreads() { return numreads_; }
39+
40+
size_t maxMaxBytesSeen() { return maxMaxBytesSeen_; }
41+
42+
private:
43+
uint8_t data[size];
44+
uint8_t* ptr;
45+
size_t remaining_;
46+
size_t numreads_ = 0;
47+
size_t maxMaxBytesSeen_ = 0;
48+
};
49+
50+
template <int size>
51+
class BarStream : public FooStream<size> {
52+
public:
53+
kj::Maybe<uint64_t> tryGetLength(StreamEncoding encoding) {
54+
return size;
55+
}
56+
};
57+
58+
KJ_TEST("test") {
59+
kj::EventLoop loop;
60+
kj::WaitScope waitScope(loop);
61+
62+
// In this first case, the stream does not report a length. The maximum
63+
// number of reads should be 3, and each allocation should be 4096
64+
FooStream<10000> stream;
65+
66+
stream.readAllBytes(10001).then([&](auto bytes) {
67+
KJ_ASSERT(bytes.size() == 10000);
68+
KJ_ASSERT(memcmp(bytes.begin(), stream.buf().begin(), 10000) == 0);
69+
}).wait(waitScope);
70+
71+
KJ_ASSERT(stream.numreads() == 3);
72+
KJ_ASSERT(stream.maxMaxBytesSeen() == 4096);
73+
}
74+
75+
KJ_TEST("test (text)") {
76+
kj::EventLoop loop;
77+
kj::WaitScope waitScope(loop);
78+
79+
// In this first case, the stream does not report a length. The maximum
80+
// number of reads should be 3, and each allocation should be 4096
81+
FooStream<10000> stream;
82+
83+
stream.readAllText(10001).then([&](auto bytes) {
84+
KJ_ASSERT(bytes.size() == 10000);
85+
KJ_ASSERT(memcmp(bytes.begin(), stream.buf().begin(), 10000) == 0);
86+
}).wait(waitScope);
87+
88+
KJ_ASSERT(stream.numreads() == 3);
89+
KJ_ASSERT(stream.maxMaxBytesSeen() == 4096);
90+
}
91+
92+
KJ_TEST("test2") {
93+
kj::EventLoop loop;
94+
kj::WaitScope waitScope(loop);
95+
96+
// In this second case, the stream does report a size, so we should see
97+
// only one read.
98+
BarStream<10000> stream;
99+
100+
stream.readAllBytes(10001).then([&](auto bytes) {
101+
KJ_ASSERT(bytes.size() == 10000);
102+
KJ_ASSERT(memcmp(bytes.begin(), stream.buf().begin(), 10000) == 0);
103+
}).wait(waitScope);
104+
105+
KJ_ASSERT(stream.numreads() == 2);
106+
KJ_ASSERT(stream.maxMaxBytesSeen() == 10000);
107+
}
108+
109+
KJ_TEST("test2 (text)") {
110+
kj::EventLoop loop;
111+
kj::WaitScope waitScope(loop);
112+
113+
// In this second case, the stream does report a size, so we should see
114+
// only one read.
115+
BarStream<10000> stream;
116+
117+
stream.readAllText(10001).then([&](auto bytes) {
118+
KJ_ASSERT(bytes.size() == 10000);
119+
KJ_ASSERT(memcmp(bytes.begin(), stream.buf().begin(), 10000) == 0);
120+
}).wait(waitScope);
121+
122+
KJ_ASSERT(stream.numreads() == 2);
123+
KJ_ASSERT(stream.maxMaxBytesSeen() == 10000);
124+
}
125+
126+
KJ_TEST("zero-length stream") {
127+
kj::EventLoop loop;
128+
kj::WaitScope waitScope(loop);
129+
130+
class Zero : public ReadableStreamSource {
131+
public:
132+
kj::Promise<size_t> tryRead(void*, size_t, size_t) {
133+
return (size_t)0;
134+
}
135+
kj::Maybe<uint64_t> tryGetLength(StreamEncoding encoding) {
136+
return (size_t)0;
137+
}
138+
};
139+
140+
Zero zero;
141+
zero.readAllBytes(10).then([&](kj::Array<kj::byte> bytes) {
142+
KJ_ASSERT(bytes.size() == 0);
143+
}).wait(waitScope);
144+
}
145+
146+
KJ_TEST("lying stream") {
147+
kj::EventLoop loop;
148+
kj::WaitScope waitScope(loop);
149+
150+
class Dishonest : public FooStream<10000> {
151+
public:
152+
kj::Maybe<uint64_t> tryGetLength(StreamEncoding encoding) {
153+
return (size_t)10;
154+
}
155+
};
156+
157+
Dishonest stream;
158+
stream.readAllBytes(10001).then([&](kj::Array<kj::byte> bytes) {
159+
// The stream lies! it says there are only 10 bytes but there are more.
160+
// oh well, we at least make sure we get the right result.
161+
KJ_ASSERT(bytes.size() == 10000);
162+
}).wait(waitScope);
163+
164+
KJ_ASSERT(stream.numreads() == 1001);
165+
KJ_ASSERT(stream.maxMaxBytesSeen() == 10);
166+
}
167+
168+
KJ_TEST("honest small stream") {
169+
kj::EventLoop loop;
170+
kj::WaitScope waitScope(loop);
171+
172+
class HonestSmall : public FooStream<100> {
173+
public:
174+
kj::Maybe<uint64_t> tryGetLength(StreamEncoding encoding) {
175+
return (size_t)100;
176+
}
177+
};
178+
179+
HonestSmall stream;
180+
stream.readAllBytes(1001).then([&](kj::Array<kj::byte> bytes) {
181+
KJ_ASSERT(bytes.size() == 100);
182+
}).wait(waitScope);
183+
184+
KJ_ASSERT(stream.numreads() == 2);
185+
KJ_ASSERT(stream.maxMaxBytesSeen(), 100);
186+
}
187+
188+
} // namespace
189+
} // namespace workerd::api

src/workerd/api/streams/internal.c++

Lines changed: 107 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -78,35 +78,125 @@ private:
7878

7979
template <typename T>
8080
kj::Promise<kj::Array<T>> read(ReadOption option = ReadOption::NONE) {
81+
// There are a few complexities in this operation that make it difficult to completely
82+
// optimize. The most important is that even if a stream reports an expected length
83+
// using tryGetLength, we really don't know how much data the stream will produce until
84+
// we try to read it. The only signal we have that the stream is done producing data
85+
// is a zero-length result from tryRead. Unfortuntately, we have to allocate a buffer
86+
// in advance of calling tryRead so we have to guess a bit at the size of the buffer
87+
// to allocate.
88+
//
89+
// In the previous implementation of this method, we would just blindly allocate a
90+
// 4096 byte buffer on every allocation, limiting each read iteration to a maximum
91+
// of 4096 bytes. This works fine for streams producing a small amount of data but
92+
// risks requiring a greater number of loop iterations and small allocations for streams
93+
// that produce larger amounts of data. Also in the previous implementation, every
94+
// loop iteration would allocate a new buffer regardless of how much of the previous
95+
// allocation was actually used -- so a stream that produces only 4000 bytes total
96+
// but only provides 10 bytes per iteration would end up with 400 reads and 400 4096
97+
// byte allocations. Doh! Fortunately our stream implementations tend to be a bit
98+
// smarter than that but it's still a worst case possibility that it's likely better
99+
// to avoid.
100+
//
101+
// So this implementation does things a bit differently.
102+
// First, we check to see if the stream can give an estimate on how much data it
103+
// expects to produce. If that length is within a given threshold, then best case
104+
// is we can perform the entire read with at most two allocations and two calls to
105+
// tryRead. The first allocation will be for the entire expected size of the stream,
106+
// which the first tryRead will attempt to fulfill completely. In the best case the
107+
// stream provides all of the data. The next allocation would be smaller and would
108+
// end up resulting in a zero-length read signaling that we are done. Hooray!
109+
//
110+
// Not everything can be best case scenario tho, unfortunately. If our first tryRead
111+
// does not fully consume the stream or fully fill the desination buffer, we're
112+
// going to need to try again. It is possible that the new allocation in the next
113+
// iteration will be wasted if the stream doesn't have any more data so it's important
114+
// for us to try to be conservative with the allocation. If the running total of data
115+
// we've seen so far is equal to or greater than the expected total length of the stream,
116+
// then the most likely case is that the next read will be zero-length -- but unfortunately
117+
// we can't know for sure! So for this we will fall back to a more conservative allocation
118+
// which is either 4096 bytes or the calculated amountToRead, whichever is the lower number.
119+
81120
kj::Vector<kj::Array<T>> parts;
82121
uint64_t runningTotal = 0;
83-
static constexpr size_t DEFAULT_BUFFER_CHUNK = 4096;
84-
static constexpr size_t MAX_BUFFER_CHUNK = DEFAULT_BUFFER_CHUNK * 4;
122+
static constexpr uint64_t MIN_BUFFER_CHUNK = 1024;
123+
static constexpr uint64_t DEFAULT_BUFFER_CHUNK = 4096;
124+
static constexpr uint64_t MAX_BUFFER_CHUNK = DEFAULT_BUFFER_CHUNK * 4;
85125

86126
// If we know in advance how much data we'll be reading, then we can attempt to
87127
// optimize the loop here by setting the value specifically so we are only
88-
// allocating once. But, to be safe, let's enforce an upper bound on each allocation
89-
// even if we do know the total.
90-
size_t amountToRead = kj::min(MAX_BUFFER_CHUNK,
91-
input.tryGetLength(StreamEncoding::IDENTITY).orDefault(DEFAULT_BUFFER_CHUNK));
92-
128+
// allocating at most twice. But, to be safe, let's enforce an upper bound on each
129+
// allocation even if we do know the total.
130+
kj::Maybe<uint64_t> maybeLength = input.tryGetLength(StreamEncoding::IDENTITY);
131+
132+
// The amountToRead is the regular allocation size we'll use right up until we've
133+
// read the number of expected bytes (if known). This number is calculated as the
134+
// minimum of (limit, MAX_BUFFER_CHUNK, maybeLength or DEFAULT_BUFFER_CHUNK). In
135+
// the best case scenario, this number is calculated such that we can read the
136+
// entire stream in one go if the amount of data is small enough and the stream
137+
// is well behaved.
138+
// If the stream does report a length, once we've read that number of bytes, we'll
139+
// fallback to the conservativeAllocation.
140+
uint64_t amountToRead = kj::min(limit,
141+
kj::min(MAX_BUFFER_CHUNK,
142+
maybeLength.orDefault(DEFAULT_BUFFER_CHUNK)));
143+
// amountToRead can be zero if the stream reported a zero-length. While the stream could
144+
// be lying about it's length, let's skip reading anything in this case.
93145
if (amountToRead > 0) {
94146
for (;;) {
95-
// TODO(perf): We can likely further optimize this loop by checking to see
96-
// how much of the buffer was filled and using the remaining buffer space if
97-
// it is not completely filled by the previous iteration. Doing so makes this
98-
// loop a bit more complicated tho, so for now let's keep things simple.
99147
auto bytes = kj::heapArray<T>(amountToRead);
100-
size_t amount = co_await input.tryRead(bytes.begin(), 1, bytes.size());
148+
// Note that we're passing amountToRead as the *minBytes* here so the tryRead should
149+
// attempt to fill the entire buffer. If it doesn't, the implication is that we read
150+
// everything.
151+
uint64_t amount = co_await input.tryRead(bytes.begin(), amountToRead, amountToRead);
152+
KJ_DASSERT(amount <= amountToRead);
101153

102-
if (amount == 0) {
154+
runningTotal += amount;
155+
JSG_REQUIRE(runningTotal < limit, TypeError, "Memory limit exceeded before EOF.");
156+
157+
if (amount < amountToRead) {
158+
// The stream has indicated that we're all done by returning a value less than the
159+
// full buffer length.
160+
// It is possible/likely that at least some amount of data was written to the buffer.
161+
// In which case we want to add that subset to the parts list here before we exit
162+
// the loop.
163+
if (amount > 0) {
164+
parts.add(bytes.slice(0, amount).attach(kj::mv(bytes)));
165+
}
103166
break;
104167
}
105168

106-
runningTotal += amount;
107-
JSG_REQUIRE(runningTotal < limit, TypeError, "Memory limit exceeded before EOF.");
108-
parts.add(bytes.slice(0, amount).attach(kj::mv(bytes)));
109-
};
169+
// Because we specify minSize equal to maxSize in the tryRead above, we should only
170+
// get here if the buffer was completely filled by the read. If it wasn't completely
171+
// filled, that is an indication that the stream is complete which is handled above.
172+
KJ_DASSERT(amount == bytes.size());
173+
parts.add(kj::mv(bytes));
174+
175+
// If the stream provided an expected length and our running total is equal to
176+
// or greater than that length then we assume we're done.
177+
KJ_IF_SOME(length, maybeLength) {
178+
if (runningTotal >= length) {
179+
// We've read everything we expect to read but some streams need to be read
180+
// completely in order to properly finish and other streams might lie (although
181+
// they shouldn't). Sigh. So we're going to make the next allocation potentially
182+
// smaller and keep reading until we get a zero length. In the best case, the next
183+
// read is going to be zero length but we have to try which will require at least
184+
// one additional (potentially wasted) allocation. (If we don't there are multiple
185+
// test failures).
186+
amountToRead = kj::min(MIN_BUFFER_CHUNK, amountToRead);
187+
continue;
188+
}
189+
}
190+
}
191+
}
192+
193+
KJ_IF_SOME(length, maybeLength) {
194+
if (runningTotal > length) {
195+
// Realistically runningTotal should never be more than length so we'll emit
196+
// a warning if it is just so we know. It would be indicative of a bug somewhere
197+
// in the implementation.
198+
KJ_LOG(WARNING, "ReadableStream provided more data than advertised", runningTotal, length);
199+
}
110200
}
111201

112202
if (option == ReadOption::NULL_TERMINATE) {

0 commit comments

Comments
 (0)