Skip to content

Commit 1e942b2

Browse files
committed
HTTP/3: Support canceling requests that aren't reading a body
1 parent 5d6f95d commit 1e942b2

File tree

14 files changed

+217
-37
lines changed

14 files changed

+217
-37
lines changed

src/Servers/Connections.Abstractions/src/Features/IProtocolErrorCodeFeature.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace Microsoft.AspNetCore.Connections.Features
99
public interface IProtocolErrorCodeFeature
1010
{
1111
/// <summary>
12-
/// Gets or sets the error code.
12+
/// Gets or sets the error code. The property returns -1 if the error code hasn't been set.
1313
/// </summary>
1414
long Error { get; set; }
1515
}

src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,30 @@ public void Initialize(Http3StreamContext context)
124124
}
125125

126126
_frameWriter.Reset(context.Transport.Output, context.ConnectionId);
127+
128+
// We register to connection closed to handle situation where the client
129+
// aborts after data is already sent. Only its read-side of the stream is aborted
130+
// which means Kestrel is only notified of the abort when it writes to the stream.
131+
// This event immediately notifies Kestrel that the client has aborted the request
132+
// and Kestrel will complete pipes and cancel the RequestAborted token.
133+
//
134+
// TODO: Consider a better way to provide this notification. For perf we want to
135+
// make the ConnectionClosed CTS pay-for-play, and change this event to use
136+
// something that is more lightweight than a CTS.
137+
context.StreamContext.ConnectionClosed.Register(static s =>
138+
{
139+
var stream = (Http3Stream)s!;
140+
141+
if (!stream.IsCompleted)
142+
{
143+
// An error code value other than -1 indicates a value was set and the request didn't gracefully complete.
144+
var errorCode = stream._errorCodeFeature.Error;
145+
if (errorCode >= 0)
146+
{
147+
stream.Abort(new ConnectionAbortedException(CoreStrings.Http2StreamResetByClient), (Http3ErrorCode)errorCode);
148+
}
149+
}
150+
}, this);
127151
}
128152

129153
public void InitializeWithExistingContext(IDuplexPipe transport)
@@ -465,7 +489,9 @@ public async Task ProcessRequestAsync<TContext>(IHttpApplication<TContext> appli
465489
catch (ConnectionResetException ex)
466490
{
467491
error = ex;
468-
Abort(new ConnectionAbortedException(ex.Message, ex), (Http3ErrorCode)_errorCodeFeature.Error);
492+
493+
var resolvedErrorCode = _errorCodeFeature.Error >= 0 ? _errorCodeFeature.Error : 0;
494+
Abort(new ConnectionAbortedException(ex.Message, ex), (Http3ErrorCode)resolvedErrorCode);
469495
}
470496
catch (Exception ex)
471497
{

src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.FeatureCollection.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,13 @@ internal sealed partial class QuicConnectionContext : IProtocolErrorCodeFeature,
1616
{
1717
private X509Certificate2? _clientCert;
1818
private Task<X509Certificate2?>? _clientCertTask;
19+
private long? _error;
1920

20-
public long Error { get; set; }
21+
public long Error
22+
{
23+
get => _error ?? -1;
24+
set => _error = value;
25+
}
2126

2227
public X509Certificate2? ClientCertificate
2328
{

src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,10 @@ public override void Abort(ConnectionAbortedException abortReason)
8383
return;
8484
}
8585

86+
var resolvedErrorCode = _error ?? 0;
8687
_abortReason = ExceptionDispatchInfo.Capture(abortReason);
87-
_log.ConnectionAbort(this, Error, abortReason.Message);
88-
_closeTask = _connection.CloseAsync(errorCode: Error).AsTask();
88+
_log.ConnectionAbort(this, resolvedErrorCode, abortReason.Message);
89+
_closeTask = _connection.CloseAsync(errorCode: resolvedErrorCode).AsTask();
8990
}
9091
}
9192

@@ -127,7 +128,7 @@ public override void Abort(ConnectionAbortedException abortReason)
127128
catch (QuicConnectionAbortedException ex)
128129
{
129130
// Shutdown initiated by peer, abortive.
130-
Error = ex.ErrorCode;
131+
_error = ex.ErrorCode;
131132
_log.ConnectionAborted(this, ex.ErrorCode, ex);
132133

133134
ThreadPool.UnsafeQueueUserWorkItem(state =>

src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.FeatureCollection.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,16 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal
99
internal sealed partial class QuicStreamContext : IPersistentStateFeature, IStreamDirectionFeature, IProtocolErrorCodeFeature, IStreamIdFeature, IStreamAbortFeature
1010
{
1111
private IDictionary<object, object?>? _persistentState;
12+
private long? _error;
1213

1314
public bool CanRead { get; private set; }
1415
public bool CanWrite { get; private set; }
1516

16-
public long Error { get; set; }
17+
public long Error
18+
{
19+
get => _error ?? -1;
20+
set => _error = value;
21+
}
1722

1823
public long StreamId { get; private set; }
1924

src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.IO;
88
using System.IO.Pipelines;
99
using System.Net.Quic;
10+
using System.Runtime.ExceptionServices;
1011
using System.Threading;
1112
using System.Threading.Tasks;
1213
using Microsoft.AspNetCore.Connections;
@@ -95,7 +96,7 @@ public void Initialize(QuicStream stream)
9596

9697
CanRead = _stream.CanRead;
9798
CanWrite = _stream.CanWrite;
98-
Error = 0;
99+
_error = null;
99100
StreamId = _stream.StreamId;
100101
PoolExpirationTicks = 0;
101102

@@ -148,6 +149,7 @@ private async Task StartAsync()
148149
// Streams may or may not have reading/writing, so only start tasks accordingly
149150
var receiveTask = Task.CompletedTask;
150151
var sendTask = Task.CompletedTask;
152+
var sendCompletedTask = Task.CompletedTask;
151153

152154
if (_stream.CanRead)
153155
{
@@ -157,18 +159,44 @@ private async Task StartAsync()
157159
if (_stream.CanWrite)
158160
{
159161
sendTask = DoSend();
162+
sendCompletedTask = WaitForWritesCompleted();
160163
}
161164

162165
// Now wait for both to complete
163166
await receiveTask;
164167
await sendTask;
168+
await sendCompletedTask;
169+
170+
await FireStreamClosedAsync();
165171
}
166172
catch (Exception ex)
167173
{
168174
_log.LogError(0, ex, $"Unexpected exception in {nameof(QuicStreamContext)}.{nameof(StartAsync)}.");
169175
}
170176
}
171177

178+
private async Task WaitForWritesCompleted()
179+
{
180+
Debug.Assert(_stream != null);
181+
182+
try
183+
{
184+
await _stream.WaitForWriteCompletionAsync();
185+
}
186+
catch (Exception ex)
187+
{
188+
// Send error to DoSend loop.
189+
lock (_shutdownLock)
190+
{
191+
_shutdownReason ??= ex;
192+
}
193+
}
194+
finally
195+
{
196+
Output.CancelPendingRead();
197+
}
198+
}
199+
172200
private async Task DoReceive()
173201
{
174202
Debug.Assert(_stream != null);
@@ -204,6 +232,9 @@ private async Task DoReceive()
204232
if (completeTask.IsCompletedSuccessfully)
205233
{
206234
// Fast path. CompleteAsync completed immediately.
235+
// Most implementations of ValueTask reset state in GetResult.
236+
completeTask.GetAwaiter().GetResult();
237+
207238
flushTask = ValueTask.FromResult(new FlushResult(isCanceled: false, isCompleted: true));
208239
}
209240
else
@@ -240,7 +271,7 @@ private async Task DoReceive()
240271
catch (QuicStreamAbortedException ex)
241272
{
242273
// Abort from peer.
243-
Error = ex.ErrorCode;
274+
_error = ex.ErrorCode;
244275
_log.StreamAborted(this, ex.ErrorCode, ex);
245276

246277
// This could be ignored if _shutdownReason is already set.
@@ -268,10 +299,6 @@ private async Task DoReceive()
268299
{
269300
// If Shutdown() has already bee called, assume that was the reason ProcessReceives() exited.
270301
Input.Complete(ResolveCompleteReceiveException(error));
271-
272-
FireStreamClosed();
273-
274-
await _waitForConnectionClosedTcs.Task;
275302
}
276303

277304
async static ValueTask<FlushResult> AwaitCompleteTaskAsync(ValueTask completeTask)
@@ -286,12 +313,12 @@ async static ValueTask<FlushResult> AwaitCompleteTaskAsync(ValueTask completeTas
286313
return _shutdownReadReason ?? _shutdownReason ?? error;
287314
}
288315

289-
private void FireStreamClosed()
316+
private Task FireStreamClosedAsync()
290317
{
291318
// Guard against scheduling this multiple times
292319
if (_streamClosed)
293320
{
294-
return;
321+
return Task.CompletedTask;
295322
}
296323

297324
_streamClosed = true;
@@ -304,6 +331,8 @@ private void FireStreamClosed()
304331
},
305332
this,
306333
preferLocal: false);
334+
335+
return _waitForConnectionClosedTcs.Task;
307336
}
308337

309338
private void CancelConnectionClosedToken()
@@ -335,6 +364,15 @@ private async Task DoSend()
335364

336365
if (result.IsCanceled)
337366
{
367+
// WaitForWritesCompleted provides immediate notification that write-side of stream has completed.
368+
// If the stream or connection is aborted then exception will be available to rethrow.
369+
370+
var ex = _shutdownWriteReason ?? _shutdownReason;
371+
if (ex != null)
372+
{
373+
ExceptionDispatchInfo.Throw(ex);
374+
}
375+
338376
break;
339377
}
340378

@@ -359,7 +397,7 @@ private async Task DoSend()
359397
catch (QuicStreamAbortedException ex)
360398
{
361399
// Abort from peer.
362-
Error = ex.ErrorCode;
400+
_error = ex.ErrorCode;
363401
_log.StreamAborted(this, ex.ErrorCode, ex);
364402

365403
// This could be ignored if _shutdownReason is already set.
@@ -370,7 +408,7 @@ private async Task DoSend()
370408
catch (QuicConnectionAbortedException ex)
371409
{
372410
// Abort from peer.
373-
Error = ex.ErrorCode;
411+
_error = ex.ErrorCode;
374412
_log.StreamAborted(this, ex.ErrorCode, ex);
375413

376414
// This could be ignored if _shutdownReason is already set.
@@ -385,6 +423,10 @@ private async Task DoSend()
385423
// System.Net.Quic exception handling not finalized.
386424
unexpectedError = ex;
387425
}
426+
catch (ConnectionAbortedException ex)
427+
{
428+
unexpectedError = ex;
429+
}
388430
catch (Exception ex)
389431
{
390432
shutdownReason = ex;
@@ -415,19 +457,20 @@ public override void Abort(ConnectionAbortedException abortReason)
415457
_serverAborted = true;
416458
_shutdownReason = abortReason;
417459

418-
_log.StreamAbort(this, Error, abortReason.Message);
460+
var resolvedErrorCode = _error ?? 0;
461+
_log.StreamAbort(this, resolvedErrorCode, abortReason.Message);
419462

420463
lock (_shutdownLock)
421464
{
422465
if (_stream != null)
423466
{
424467
if (_stream.CanRead)
425468
{
426-
_stream.AbortRead(Error);
469+
_stream.AbortRead(resolvedErrorCode);
427470
}
428471
if (_stream.CanWrite)
429472
{
430-
_stream.AbortWrite(Error);
473+
_stream.AbortWrite(resolvedErrorCode);
431474
}
432475
}
433476
}

src/Servers/Kestrel/Transport.Quic/test/QuicConnectionContextTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ public async Task AcceptAsync_ClientStartsAndStopsBidirectionStream_ServerAccept
181181
read = await serverStream.Transport.Input.ReadAsync().DefaultTimeout();
182182
Assert.True(read.IsCompleted);
183183

184+
await serverStream.Transport.Output.CompleteAsync();
185+
184186
await closedTcs.Task.DefaultTimeout();
185187
}
186188

src/Servers/Kestrel/Transport.Quic/test/QuicConnectionListenerTests.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,6 @@ public async Task ClientCertificate_Required_NotSent_ConnectionAborted()
112112

113113
var qex = await Assert.ThrowsAsync<QuicException>(async () => await clientConnection.ConnectAsync().DefaultTimeout());
114114
Assert.Equal("Connection has been shutdown by transport. Error Code: 0x80410100", qex.Message);
115-
116-
// https://github.com/dotnet/runtime/issues/57246 The accept still completes even though the connection was rejected, but it's already failed.
117-
var serverContext = await connectionListener.AcceptAndAddFeatureAsync().DefaultTimeout();
118-
await Assert.ThrowsAsync<QuicException>(() => serverContext.ConnectAsync().DefaultTimeout());
119115
}
120116
}
121117
}

src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ public async Task BidirectionalStream_ServerWritesDataAndDisposes_ClientReadsDat
139139
await serverStream.Transport.Input.CompleteAsync().DefaultTimeout();
140140
await serverStream.Transport.Output.CompleteAsync().DefaultTimeout();
141141

142+
Logger.LogInformation("Client reading until end of stream.");
143+
var data = await clientStream.ReadUntilEndAsync().DefaultTimeout();
144+
Assert.Equal(testData.Length, data.Length);
145+
Assert.Equal(testData, data);
146+
142147
var quicStreamContext = Assert.IsType<QuicStreamContext>(serverStream);
143148

144149
Logger.LogInformation("Server waiting for send and receiving loops to complete.");
@@ -150,11 +155,6 @@ public async Task BidirectionalStream_ServerWritesDataAndDisposes_ClientReadsDat
150155
await quicStreamContext.DisposeAsync().DefaultTimeout();
151156
quicStreamContext.Dispose();
152157

153-
Logger.LogInformation("Client reading until end of stream.");
154-
var data = await clientStream.ReadUntilEndAsync().DefaultTimeout();
155-
Assert.Equal(testData.Length, data.Length);
156-
Assert.Equal(testData, data);
157-
158158
var quicConnectionContext = Assert.IsType<QuicConnectionContext>(serverConnection);
159159

160160
Assert.Equal(1, quicConnectionContext.StreamPool.Count);
@@ -402,6 +402,7 @@ public async Task ServerToClientUnidirectionalStream_ServerAborts_ClientGetsAbor
402402

403403
Assert.Equal(TestData, data);
404404

405+
Logger.LogInformation("Server aborting stream");
405406
((IProtocolErrorCodeFeature)serverStream).Error = (long)Http3ErrorCode.InternalError;
406407
serverStream.Abort(new ConnectionAbortedException("Test message"));
407408

src/Servers/Kestrel/shared/test/Http3/Http3InMemory.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -922,7 +922,7 @@ internal class TestMultiplexedConnectionContext : MultiplexedConnectionContext,
922922
});
923923

924924
private readonly Http3InMemory _testBase;
925-
private long _error;
925+
private long? _error;
926926

927927
public TestMultiplexedConnectionContext(Http3InMemory testBase)
928928
{
@@ -946,7 +946,7 @@ public TestMultiplexedConnectionContext(Http3InMemory testBase)
946946

947947
public long Error
948948
{
949-
get => _error;
949+
get => _error ?? -1;
950950
set => _error = value;
951951
}
952952

@@ -1019,6 +1019,7 @@ internal class TestStreamContext : ConnectionContext, IStreamDirectionFeature, I
10191019

10201020
private TaskCompletionSource _disposingTcs;
10211021
private TaskCompletionSource _disposedTcs;
1022+
internal long? _error;
10221023

10231024
public TestStreamContext(bool canRead, bool canWrite, Http3InMemory testBase)
10241025
{
@@ -1072,6 +1073,7 @@ public void Initialize(long streamId)
10721073
ConnectionId = "TEST:" + streamId.ToString();
10731074
AbortReadException = null;
10741075
AbortWriteException = null;
1076+
_error = null;
10751077

10761078
_disposedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
10771079
_disposingTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -1112,7 +1114,11 @@ public override IDuplexPipe Transport
11121114

11131115
public bool CanWrite { get; }
11141116

1115-
public long Error { get; set; }
1117+
public long Error
1118+
{
1119+
get => _error ?? -1;
1120+
set => _error = value;
1121+
}
11161122

11171123
public override void Abort(ConnectionAbortedException abortReason)
11181124
{

0 commit comments

Comments
 (0)