-
Notifications
You must be signed in to change notification settings - Fork 1
/
multibuffer_test.go
332 lines (273 loc) · 8.46 KB
/
multibuffer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
package multireader_test
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"math/rand"
"testing"
"github.com/mcpherrinm/multireader"
"github.com/stretchr/testify/require"
)
func randBytes(t *testing.T, count int) []byte {
data := make([]byte, count)
_, err := rand.Read(data) //nolint: gosec // OK for this to be weak
require.NoError(t, err)
return data
}
func requireRead(t *testing.T, data []byte, reader io.Reader) {
buf := make([]byte, len(data))
n, err := reader.Read(buf)
require.NoError(t, err)
require.Equal(t, len(data), n)
require.Equal(t, data, buf)
}
func requireWrite(t *testing.T, data []byte, writer io.Writer) {
n, err := writer.Write(data)
require.NoError(t, err)
require.Equal(t, len(data), n)
}
func requireSeek(t *testing.T, offset int64, whence int, expected int64, seeker io.Seeker) {
where, err := seeker.Seek(offset, whence)
require.NoError(t, err)
require.EqualValues(t, expected, where)
}
// TestWriteCloseRead is the simple no-contention case:
// Write data to buffer then close
// Multiple readers after-the-fact
func TestWriteCloseRead(t *testing.T) {
count := 10000
data := randBytes(t, count)
mr := multireader.New()
// Create a reader before and after writing
beforeReader := mr.Reader()
requireWrite(t, data, mr)
require.NoError(t, mr.Close())
afterReader := mr.Reader()
beforeData, err := ioutil.ReadAll(beforeReader)
require.NoError(t, err)
afterData, err := ioutil.ReadAll(afterReader)
require.NoError(t, err)
require.Equal(t, data, beforeData)
require.Equal(t, data, afterData)
}
// TestInterlacedNonblocking does a series of different-sized writes followed by a read of the same size.
// The readers should be able to follow without blocking.
func TestInterlacedNonblocking(t *testing.T) {
mr := multireader.New()
readers := []io.Reader{mr.Reader(), mr.Reader(), mr.Reader()}
for i := 0; i < 5000; i++ {
data := randBytes(t, i)
n, err := mr.Write(data)
require.NoError(t, err)
require.Equal(t, i, n)
for _, reader := range readers {
requireRead(t, data, reader)
}
}
require.NoError(t, mr.Close())
for _, reader := range readers {
buf := make([]byte, 100)
n, err := reader.Read(buf)
require.Equal(t, io.EOF, err)
require.Zero(t, n)
}
}
// TestInterlacedBlocking sets up a few readers that try to read, might block, and then we write.
// Then we wait on all reads to complete. Finally, we set the readers to read, and Close() the writer,
// verifying they unblock and get an io.EOF.
func TestInterlacedBlocking(t *testing.T) {
mr := multireader.New()
readers := []io.Reader{mr.Reader(), mr.Reader(), mr.Reader()}
for i := 0; i < 5000; i++ {
data := randBytes(t, i)
errChan := make(chan error)
for _, reader := range readers {
go func(reader io.Reader, data []byte, errChan chan error) {
buf := make([]byte, len(data))
n, err := reader.Read(buf)
if n != len(data) {
errChan <- fmt.Errorf("unexpected read length: %d != %d", len(data), n)
return
}
if err != nil {
errChan <- fmt.Errorf("unexpected error from read: %w", err)
return
}
errChan <- nil
}(reader, data, errChan)
}
requireWrite(t, data, mr)
// Check we got the expected number of responses back after writing
for range readers {
require.NoError(t, <-errChan)
}
}
// Have readers block
errChan := make(chan error)
for _, reader := range readers {
go func(reader io.Reader, errChan chan error) {
buf := make([]byte, 1000)
n, err := reader.Read(buf)
if n != 0 {
errChan <- fmt.Errorf("expected to read 0 bytes, not %d", n)
return
}
if err != io.EOF {
errChan <- fmt.Errorf("unexpected error from read: %w", err)
return
}
errChan <- nil
}(reader, errChan)
}
require.NoError(t, mr.Close())
// Check we got the expected number of responses back after closing
for range readers {
require.NoError(t, <-errChan)
}
}
// TestFuzz launches a series of readers, doing random-sized reads, and a writer, doing random-sized writes.
// All readers should end up with the same data at the end.
func TestFuzz(t *testing.T) {
mr := multireader.New()
data := randBytes(t, 10000+rand.Intn(100000))
errChan := make(chan error)
go func(writer io.WriteCloser, data []byte, errChan chan error) {
for len(data) > 0 {
amount := 1 + rand.Intn(len(data))
n, err := writer.Write(data[:amount])
data = data[amount:]
if err != nil {
errChan <- fmt.Errorf("error writing: %w", err)
return
}
if n != amount {
errChan <- fmt.Errorf("wrote wrong amount %d != %d", n, amount)
return
}
}
errChan <- writer.Close()
}(mr, data, errChan)
readers := rand.Intn(100)
for i := 0; i < readers; i++ {
go func(reader io.ReadSeeker, data []byte, errChan chan error) {
gotData := make([]byte, 0, len(data))
for {
amount := rand.Intn(len(data))
buf := make([]byte, amount)
n, err := reader.Read(buf)
if err != nil && err != io.EOF {
errChan <- fmt.Errorf("error reading: %w", err)
return
}
gotData = append(gotData, buf[:n]...)
if err == io.EOF {
if !bytes.Equal(gotData, data) {
errChan <- fmt.Errorf("gotData wasn't expected: %v != %v", gotData, data)
return
}
errChan <- nil
return
}
}
}(mr.Reader(), data, errChan)
}
// One final reader, to ensure we can still read all the data
gotData, err := ioutil.ReadAll(mr.Reader())
require.NoError(t, err)
require.Equal(t, data, gotData)
// readers + the writer
for i := 0; i < readers+1; i++ {
require.NoError(t, <-errChan)
}
}
func TestReaderSeek(t *testing.T) {
mr := multireader.New()
reader := mr.Reader()
// No bytes written or read, so no matter whence, a 0 offset seek puts us at 0
for _, whence := range []int{io.SeekStart, io.SeekCurrent, io.SeekEnd} {
where, err := reader.Seek(0, whence)
require.NoError(t, err)
require.Zero(t, where)
}
data := randBytes(t, 1024)
requireWrite(t, data, mr)
// Seek from start to the end
requireSeek(t, 1024, io.SeekStart, 1024, reader)
// Seek to middle:
requireSeek(t, -512, io.SeekCurrent, 512, reader)
requireRead(t, data[512:], reader)
requireSeek(t, -512, io.SeekEnd, 512, reader)
requireRead(t, data[512:], reader)
requireSeek(t, 512, io.SeekStart, 512, reader)
requireRead(t, data[512:], reader)
// Append more data and close
requireWrite(t, data, mr)
require.NoError(t, mr.Close())
// seek to end
end, err := reader.Seek(0, io.SeekEnd)
require.NoError(t, err)
require.EqualValues(t, 2048, end)
// At end, should read EOF
buf := make([]byte, 1)
n, err := reader.Read(buf)
require.Zero(t, n)
require.Equal(t, io.EOF, err)
// Seek back to start and we should be able to read the whole thing
start, err := reader.Seek(0, io.SeekStart)
require.NoError(t, err)
require.Zero(t, start)
buf = make([]byte, 2048)
n, err = reader.Read(buf)
require.EqualValues(t, 2048, n)
require.NoError(t, err)
require.Equal(t, data, buf[1024:])
require.Equal(t, data, buf[:1024])
// Error cases:
// Unknown whence
_, err = reader.Seek(0, 12345)
require.Error(t, err)
// Negative offset after seeking
_, err = reader.Seek(-1, io.SeekStart)
require.Error(t, err)
// Offset too long
_, err = reader.Seek(2049, io.SeekStart)
require.Error(t, err)
}
// TestZeroByte reads and writes.
// While they're not particularly useful, it's good to support them.
// Notably, bugs came up with them in the above randomized testing.
func TestZeroByte(t *testing.T) {
mr := multireader.New()
reader := mr.Reader()
// Ensure a zero-byte read succeeds before any writes:
requireRead(t, []byte{}, reader)
// ensure a zero-byte write succeeds:
requireWrite(t, []byte{}, mr)
requireWrite(t, []byte("data"), mr)
require.NoError(t, mr.Close())
// A version of this code had a bug where io.EOF was returned for any zero-byte read after closing
requireRead(t, []byte{}, reader)
requireRead(t, []byte("data"), reader)
// A zero-byte read should return EOF after closed and all data has been read:
n, err := reader.Read([]byte{})
require.Zero(t, n)
require.Equal(t, io.EOF, err)
// Writes after close should fail
n, err = mr.Write([]byte("abc"))
require.Zero(t, n)
require.Error(t, err)
}
// TestShortRead makes sure we properly handle the case where the passed-in buffer is longer than
// the amount of data available.
func TestShortRead(t *testing.T) {
mr := multireader.New()
reader := mr.Reader()
data := randBytes(t, 1000)
requireWrite(t, data, mr)
buf := make([]byte, 2048)
n, err := reader.Read(buf)
require.NoError(t, err)
require.EqualValues(t, 1000, n)
require.Equal(t, buf[:n], data)
}