Skip to content

Commit dd61f0c

Browse files
authored
Add PipeReader.Create from ReadOnlySequence<byte> (#48369)
Add new factory method to PipeReader to create a PipeReader from a ReadOnlySequence<byte>
1 parent 009c978 commit dd61f0c

File tree

6 files changed

+342
-0
lines changed

6 files changed

+342
-0
lines changed

src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ protected PipeReader() { }
5050
public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Pipelines.PipeWriter destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
5151
public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
5252
public static System.IO.Pipelines.PipeReader Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeReaderOptions? readerOptions = null) { throw null; }
53+
public static System.IO.Pipelines.PipeReader Create(System.Buffers.ReadOnlySequence<byte> sequence) { throw null; }
5354
[System.ObsoleteAttribute("OnWriterCompleted may not be invoked on all implementations of PipeReader. This will be removed in a future release.")]
5455
public virtual void OnWriterCompleted(System.Action<System.Exception?, object?> callback, object? state) { }
5556
public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult> ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));

src/libraries/System.IO.Pipelines/src/System.IO.Pipelines.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
<Compile Include="System\IO\Pipelines\PipeWriterStream.cs" />
3232
<Compile Include="System\IO\Pipelines\ReadResult.cs" />
3333
<Compile Include="System\IO\Pipelines\ResultFlags.cs" />
34+
<Compile Include="System\IO\Pipelines\SequencePipeReader.cs" />
3435
<Compile Include="System\IO\Pipelines\StreamPipeExtensions.cs" />
3536
<Compile Include="System\IO\Pipelines\StreamPipeReader.cs" />
3637
<Compile Include="System\IO\Pipelines\StreamPipeReaderOptions.cs" />

src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,16 @@ public static PipeReader Create(Stream stream, StreamPipeReaderOptions? readerOp
104104
return new StreamPipeReader(stream, readerOptions ?? StreamPipeReaderOptions.s_default);
105105
}
106106

107+
/// <summary>
108+
/// Creates a <see cref="PipeReader"/> wrapping the specified <see cref="ReadOnlySequence{T}"/>.
109+
/// </summary>
110+
/// <param name="sequence">The sequence.</param>
111+
/// <returns>A <see cref="PipeReader"/> that wraps the <see cref="ReadOnlySequence{T}"/>.</returns>
112+
public static PipeReader Create(ReadOnlySequence<byte> sequence)
113+
{
114+
return new SequencePipeReader(sequence);
115+
}
116+
107117
/// <summary>Asynchronously reads the bytes from the <see cref="System.IO.Pipelines.PipeReader" /> and writes them to the specified <see cref="System.IO.Pipelines.PipeWriter" />, using a specified buffer size and cancellation token.</summary>
108118
/// <param name="destination">The pipe writer to which the contents of the current stream will be copied.</param>
109119
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="System.Threading.CancellationToken.None" />.</param>
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Buffers;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
8+
namespace System.IO.Pipelines
9+
{
10+
internal sealed class SequencePipeReader : PipeReader
11+
{
12+
private ReadOnlySequence<byte> _sequence;
13+
private bool _isReaderCompleted;
14+
15+
private int _cancelNext;
16+
17+
public SequencePipeReader(ReadOnlySequence<byte> sequence)
18+
{
19+
_sequence = sequence;
20+
}
21+
22+
/// <inheritdoc />
23+
public override void AdvanceTo(SequencePosition consumed)
24+
{
25+
AdvanceTo(consumed, consumed);
26+
}
27+
28+
/// <inheritdoc />
29+
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
30+
{
31+
ThrowIfCompleted();
32+
33+
// Fast path: did we consume everything?
34+
if (consumed.Equals(_sequence.End))
35+
{
36+
_sequence = ReadOnlySequence<byte>.Empty;
37+
return;
38+
}
39+
40+
_sequence = _sequence.Slice(consumed);
41+
}
42+
43+
/// <inheritdoc />
44+
public override void CancelPendingRead()
45+
{
46+
Interlocked.Exchange(ref _cancelNext, 1);
47+
}
48+
49+
/// <inheritdoc />
50+
public override void Complete(Exception? exception = null)
51+
{
52+
if (_isReaderCompleted)
53+
{
54+
return;
55+
}
56+
57+
_isReaderCompleted = true;
58+
_sequence = ReadOnlySequence<byte>.Empty;
59+
}
60+
61+
/// <inheritdoc />
62+
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
63+
{
64+
if (TryRead(out ReadResult result))
65+
{
66+
return new ValueTask<ReadResult>(result);
67+
}
68+
69+
result = new ReadResult(ReadOnlySequence<byte>.Empty, isCanceled: false, isCompleted: true);
70+
return new ValueTask<ReadResult>(result);
71+
}
72+
73+
/// <inheritdoc />
74+
public override bool TryRead(out ReadResult result)
75+
{
76+
ThrowIfCompleted();
77+
78+
bool isCancellationRequested = Interlocked.Exchange(ref _cancelNext, 0) == 1;
79+
if (isCancellationRequested || _sequence.Length > 0)
80+
{
81+
result = new ReadResult(_sequence, isCancellationRequested, isCompleted: true);
82+
return true;
83+
}
84+
85+
result = default;
86+
return false;
87+
}
88+
89+
private void ThrowIfCompleted()
90+
{
91+
if (_isReaderCompleted)
92+
{
93+
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
94+
}
95+
}
96+
}
97+
}
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System;
5+
using System.Buffers;
6+
using System.Collections.Generic;
7+
using System.Linq;
8+
using System.Text;
9+
using System.Threading.Tasks;
10+
using Xunit;
11+
12+
namespace System.IO.Pipelines.Tests
13+
{
14+
public class SequencePipeReaderTests
15+
{
16+
[Fact]
17+
public async Task CanRead()
18+
{
19+
var sequence = new ReadOnlySequence<byte>(Encoding.ASCII.GetBytes("Hello World"));
20+
var reader = PipeReader.Create(sequence);
21+
22+
ReadResult readResult = await reader.ReadAsync();
23+
ReadOnlySequence<byte> buffer = readResult.Buffer;
24+
25+
Assert.Equal(11, buffer.Length);
26+
Assert.True(buffer.IsSingleSegment);
27+
Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
28+
29+
reader.AdvanceTo(buffer.End);
30+
reader.Complete();
31+
}
32+
33+
[Fact]
34+
public async Task TryReadReturnsTrueIfBufferedBytesAndNotExaminedEverything()
35+
{
36+
var sequence = new ReadOnlySequence<byte>(Encoding.ASCII.GetBytes("Hello World"));
37+
var reader = PipeReader.Create(sequence);
38+
39+
ReadResult readResult = await reader.ReadAsync();
40+
ReadOnlySequence<byte> buffer = readResult.Buffer;
41+
Assert.Equal(11, buffer.Length);
42+
Assert.True(buffer.IsSingleSegment);
43+
reader.AdvanceTo(buffer.Start, buffer.GetPosition(5));
44+
45+
Assert.True(reader.TryRead(out readResult));
46+
Assert.Equal(11, buffer.Length);
47+
Assert.True(buffer.IsSingleSegment);
48+
Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray()));
49+
50+
reader.Complete();
51+
}
52+
53+
[Fact]
54+
public async Task TryReadReturnsFalseIfBufferedBytesAndEverythingExamined()
55+
{
56+
var sequence = new ReadOnlySequence<byte>(Encoding.ASCII.GetBytes("Hello World"));
57+
var reader = PipeReader.Create(sequence);
58+
59+
ReadResult readResult = await reader.ReadAsync();
60+
ReadOnlySequence<byte> buffer = readResult.Buffer;
61+
Assert.Equal(11, buffer.Length);
62+
Assert.True(buffer.IsSingleSegment);
63+
reader.AdvanceTo(buffer.End);
64+
65+
Assert.False(reader.TryRead(out readResult));
66+
reader.Complete();
67+
}
68+
69+
[Fact]
70+
public async Task ReadAsyncAfterReceivingCompletedReadResultDoesNotThrow()
71+
{
72+
var sequence = ReadOnlySequence<byte>.Empty;
73+
PipeReader reader = PipeReader.Create(sequence);
74+
ReadResult readResult = await reader.ReadAsync();
75+
Assert.True(readResult.Buffer.IsEmpty);
76+
Assert.True(readResult.IsCompleted);
77+
reader.AdvanceTo(readResult.Buffer.End);
78+
79+
readResult = await reader.ReadAsync();
80+
Assert.True(readResult.Buffer.IsEmpty);
81+
Assert.True(readResult.IsCompleted);
82+
reader.AdvanceTo(readResult.Buffer.End);
83+
reader.Complete();
84+
}
85+
86+
[Fact]
87+
public async Task DataCanBeReadMultipleTimes()
88+
{
89+
var helloBytes = Encoding.ASCII.GetBytes("Hello World");
90+
var sequence = new ReadOnlySequence<byte>(helloBytes);
91+
PipeReader reader = PipeReader.Create(sequence);
92+
93+
94+
ReadResult readResult = await reader.ReadAsync();
95+
ReadOnlySequence<byte> buffer = readResult.Buffer;
96+
reader.AdvanceTo(buffer.Start, buffer.End);
97+
98+
// Make sure IsCompleted is true
99+
readResult = await reader.ReadAsync();
100+
buffer = readResult.Buffer;
101+
reader.AdvanceTo(buffer.Start, buffer.End);
102+
Assert.True(readResult.IsCompleted);
103+
104+
var value = await ReadFromPipeAsString(reader);
105+
Assert.Equal("Hello World", value);
106+
reader.Complete();
107+
}
108+
109+
[Fact]
110+
public async Task NextReadAfterPartiallyExaminedReturnsImmediately()
111+
{
112+
var sequence = new ReadOnlySequence<byte>(Encoding.ASCII.GetBytes(new string('a', 10000)));
113+
PipeReader reader = PipeReader.Create(sequence);
114+
115+
ReadResult readResult = await reader.ReadAsync();
116+
reader.AdvanceTo(readResult.Buffer.Start, readResult.Buffer.GetPosition(2048));
117+
118+
ValueTask<ReadResult> task = reader.ReadAsync();
119+
120+
// This should complete synchronously since
121+
Assert.True(task.IsCompleted);
122+
123+
readResult = await task;
124+
reader.AdvanceTo(readResult.Buffer.End);
125+
reader.Complete();
126+
}
127+
128+
[Fact]
129+
public async Task CompleteReaderWithoutAdvanceDoesNotThrow()
130+
{
131+
PipeReader reader = PipeReader.Create(ReadOnlySequence<byte>.Empty);
132+
await reader.ReadAsync();
133+
reader.Complete();
134+
}
135+
136+
[Fact]
137+
public async Task AdvanceAfterCompleteThrows()
138+
{
139+
PipeReader reader = PipeReader.Create(new ReadOnlySequence<byte>(new byte[100]));
140+
ReadOnlySequence<byte> buffer = (await reader.ReadAsync()).Buffer;
141+
142+
reader.Complete();
143+
144+
Assert.Throws<InvalidOperationException>(() => reader.AdvanceTo(buffer.End));
145+
}
146+
147+
[Fact]
148+
public async Task ThrowsOnReadAfterCompleteReader()
149+
{
150+
PipeReader reader = PipeReader.Create(ReadOnlySequence<byte>.Empty);
151+
152+
reader.Complete();
153+
await Assert.ThrowsAsync<InvalidOperationException>(async () => await reader.ReadAsync());
154+
}
155+
156+
[Fact]
157+
public void TryReadAfterCancelPendingReadReturnsTrue()
158+
{
159+
PipeReader reader = PipeReader.Create(ReadOnlySequence<byte>.Empty);
160+
161+
reader.CancelPendingRead();
162+
163+
Assert.True(reader.TryRead(out ReadResult result));
164+
Assert.True(result.IsCanceled);
165+
reader.AdvanceTo(result.Buffer.End);
166+
reader.Complete();
167+
}
168+
169+
[Fact]
170+
public async Task ReadAsyncReturnsCanceledIfCanceledBeforeRead()
171+
{
172+
var sequence = new ReadOnlySequence<byte>(new byte[10000]);
173+
PipeReader reader = PipeReader.Create(sequence);
174+
175+
// Make sure state isn't used from before
176+
for (var i = 0; i < 3; i++)
177+
{
178+
reader.CancelPendingRead();
179+
ValueTask<ReadResult> readResultTask = reader.ReadAsync();
180+
Assert.True(readResultTask.IsCompleted);
181+
ReadResult readResult = readResultTask.GetAwaiter().GetResult();
182+
Assert.True(readResult.IsCanceled);
183+
readResult = await reader.ReadAsync();
184+
reader.AdvanceTo(readResult.Buffer.End);
185+
}
186+
187+
reader.Complete();
188+
}
189+
190+
[Fact]
191+
public async Task ReadAsyncReturnsCanceledInterleaved()
192+
{
193+
var sequence = new ReadOnlySequence<byte>(new byte[10000]);
194+
PipeReader reader = PipeReader.Create(sequence);
195+
196+
// Cancel and Read interleaved to confirm cancellations are independent
197+
for (var i = 0; i < 3; i++)
198+
{
199+
reader.CancelPendingRead();
200+
ValueTask<ReadResult> readResultTask = reader.ReadAsync();
201+
Assert.True(readResultTask.IsCompleted);
202+
ReadResult readResult = readResultTask.GetAwaiter().GetResult();
203+
Assert.True(readResult.IsCanceled);
204+
205+
readResult = await reader.ReadAsync();
206+
Assert.False(readResult.IsCanceled);
207+
}
208+
209+
reader.Complete();
210+
}
211+
212+
[Fact]
213+
public void OnWriterCompletedNoops()
214+
{
215+
bool fired = false;
216+
PipeReader reader = PipeReader.Create(ReadOnlySequence<byte>.Empty);
217+
#pragma warning disable CS0618 // Type or member is obsolete
218+
reader.OnWriterCompleted((_, __) => { fired = true; }, null);
219+
#pragma warning restore CS0618 // Type or member is obsolete
220+
reader.Complete();
221+
Assert.False(fired);
222+
}
223+
224+
private static async Task<string> ReadFromPipeAsString(PipeReader reader)
225+
{
226+
ReadResult readResult = await reader.ReadAsync();
227+
var result = Encoding.ASCII.GetString(readResult.Buffer.ToArray());
228+
reader.AdvanceTo(readResult.Buffer.End);
229+
return result;
230+
}
231+
}
232+
}

src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
<Compile Include="ReadAsyncCompletionTests.cs" />
3131
<Compile Include="ReadResultTests.cs" />
3232
<Compile Include="SchedulerFacts.cs" />
33+
<Compile Include="SequencePipeReaderTests.cs" />
3334
<Compile Include="StreamPipeReaderTests.cs" />
3435
<Compile Include="Infrastructure\TestMemoryPool.cs" />
3536
<Compile Include="StreamPipeWriterTests.cs" />

0 commit comments

Comments
 (0)