From c7f976631efcd0eebd84aed21653c5261dd7ec7f Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Fri, 16 Jun 2023 17:53:07 +0200 Subject: [PATCH] Support for StreamReceiveWindowSize, fixes #3316 (#3371) --- .../Slic/Internal/SlicConnection.cs | 67 +++++++++-------- .../Slic/Internal/SlicPipeReader.cs | 24 +++---- .../Slic/Internal/SlicPipeWriter.cs | 72 +++++++++++-------- .../Transports/Slic/Internal/SlicStream.cs | 70 +++++++++--------- .../Transports/Slic/SlicTransportOptions.cs | 40 +++++------ .../Transports/Slic/SlicTransportTests.cs | 51 ++++++------- 6 files changed, 167 insertions(+), 157 deletions(-) diff --git a/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs b/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs index 2444d83c67..4a54621409 100644 --- a/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs +++ b/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs @@ -23,19 +23,28 @@ internal class SlicConnection : IMultiplexedConnection /// Gets the minimum size of the segment requested from . internal int MinSegmentSize { get; } - internal int PauseWriterThreshold { get; } - - /// Gets the maximum size of packets accepted by the peer. + /// Gets the maximum size of packets accepted by the peer. This property is set to the value carried by the frame. internal int PeerPacketMaxSize { get; private set; } - // TODO: replace with a window size property - internal int PeerPauseWriterThreshold { get; private set; } + /// Gets the peer's initial stream window size. This property is set to the value carried by the + /// frame. + internal int PeerInitialStreamWindowSize { get; private set; } /// Gets the used for obtaining memory buffers. internal MemoryPool Pool { get; } - // TODO: replace with a window size property - internal int ResumeWriterThreshold { get; } + /// Gets the initial stream window size. + internal int InitialStreamWindowSize { get; } + + /// Gets the window update threshold. When the window size is increased and this threshold reached, a frame is sent. + internal int StreamWindowUpdateThreshold => InitialStreamWindowSize / StreamWindowUpdateRatio; + + // The ratio used to compute the StreamWindowUpdateThreshold. For now, the stream window update is sent when the + // window size grows over InitialStreamWindowSize / StreamWindowUpdateRatio. + private const int StreamWindowUpdateRatio = 2; private readonly Channel _acceptStreamChannel; private int _bidirectionalStreamCount; @@ -560,8 +569,7 @@ internal SlicConnection( _maxBidirectionalStreams = options.MaxBidirectionalStreams; _maxUnidirectionalStreams = options.MaxUnidirectionalStreams; - PauseWriterThreshold = slicOptions.PauseWriterThreshold; - ResumeWriterThreshold = slicOptions.ResumeWriterThreshold; + InitialStreamWindowSize = slicOptions.InitialStreamWindowSize; _localIdleTimeout = slicOptions.IdleTimeout; _packetMaxSize = slicOptions.PacketMaxSize; _acceptStreamChannel = Channel.CreateUnbounded(new UnboundedChannelOptions @@ -585,7 +593,6 @@ internal SlicConnection( // Initially set the peer packet max size to the local max size to ensure we can receive the first initialize // frame. PeerPacketMaxSize = _packetMaxSize; - PeerPauseWriterThreshold = PauseWriterThreshold; // We use the same stream ID numbering scheme as Quic. if (IsServer) @@ -792,8 +799,8 @@ internal async ValueTask WriteStreamDataFrameAsync( } // Notify the stream that we're consuming sendSize credit. It's important to call this before sending - // the stream frame to avoid race conditions where the StreamConsumed frame could be received before the - // send credit was updated. + // the stream frame to avoid race conditions where the StreamWindowUpdate frame could be received before + // the send credit was updated. if (sendCredit > 0) { stream.ConsumedSendCredit((int)(sendSource1.Length + sendSource2.Length)); @@ -900,7 +907,7 @@ private void AddStream(ulong id, SlicStream stream) private void DecodeParameters(IDictionary> parameters) { int? peerPacketMaxSize = null; - int? peerPauseWriterThreshold = null; + int? peerInitialStreamWindowSize = null; foreach ((ParameterKey key, IList buffer) in parameters) { switch (key) @@ -939,17 +946,17 @@ private void DecodeParameters(IDictionary> parameters) if (peerPacketMaxSize < 1024) { throw new InvalidDataException( - "The PacketMaxSize Slic connection parameter is invalid, it must be greater than 1KB."); + "The PacketMaxSize connection parameter is invalid, it must be greater than 1KB."); } break; } - case ParameterKey.PauseWriterThreshold: + case ParameterKey.InitialStreamWindowSize: { - peerPauseWriterThreshold = DecodeParamValue(buffer); - if (peerPauseWriterThreshold < 1024) + peerInitialStreamWindowSize = DecodeParamValue(buffer); + if (peerInitialStreamWindowSize < 1024) { throw new InvalidDataException( - "The PauseWriterThreshold Slic connection parameter is invalid, it must be greater than 1KB."); + "The InitialStreamWindowSize connection parameter is invalid, it must be greater than 1KB."); } break; } @@ -960,21 +967,21 @@ private void DecodeParameters(IDictionary> parameters) if (peerPacketMaxSize is null) { throw new InvalidDataException( - "The peer didn't send the required PacketMaxSize Slic connection parameter."); + "The peer didn't send the required PacketMaxSize connection parameter."); } else { PeerPacketMaxSize = peerPacketMaxSize.Value; } - if (peerPauseWriterThreshold is null) + if (peerInitialStreamWindowSize is null) { throw new InvalidDataException( - "The peer didn't send the required PauseWriterThreshold Slic connection parameter."); + "The peer didn't send the required InitialStreamWindowSize connection parameter."); } else { - PeerPauseWriterThreshold = peerPauseWriterThreshold.Value; + PeerInitialStreamWindowSize = peerInitialStreamWindowSize.Value; } // all parameter values are currently integers in the range 0..Int32Max encoded as varuint62. @@ -1001,7 +1008,7 @@ private Dictionary> EncodeParameters() { // Required parameters. EncodeParameter(ParameterKey.PacketMaxSize, (ulong)_packetMaxSize), - EncodeParameter(ParameterKey.PauseWriterThreshold, (ulong)PauseWriterThreshold) + EncodeParameter(ParameterKey.InitialStreamWindowSize, (ulong)InitialStreamWindowSize) }; // Optional parameters. @@ -1055,9 +1062,9 @@ private Task ReadFrameAsync(FrameType type, int size, ulong? streamId, Cancellat { return ReadStreamDataFrameAsync(type, size, streamId!.Value, cancellationToken); } - case FrameType.StreamConsumed: + case FrameType.StreamWindowUpdate: { - return ReadStreamConsumedFrameAsync(size, streamId!.Value, cancellationToken); + return ReadStreamWindowUpdateFrameAsync(size, streamId!.Value, cancellationToken); } case FrameType.StreamReadsClosed: { @@ -1167,15 +1174,15 @@ async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken) } } - async Task ReadStreamConsumedFrameAsync(int size, ulong streamId, CancellationToken cancellationToken) + async Task ReadStreamWindowUpdateFrameAsync(int size, ulong streamId, CancellationToken cancellationToken) { - StreamConsumedBody frame = await ReadFrameBodyAsync( + StreamWindowUpdateBody frame = await ReadFrameBodyAsync( size, - (ref SliceDecoder decoder) => new StreamConsumedBody(ref decoder), + (ref SliceDecoder decoder) => new StreamWindowUpdateBody(ref decoder), cancellationToken).ConfigureAwait(false); if (_streams.TryGetValue(streamId, out SlicStream? stream)) { - stream.ReceivedConsumedFrame(frame); + stream.ReceivedWindowUpdateFrame(frame); } } @@ -1435,7 +1442,7 @@ await _acceptStreamChannel.Writer.WriteAsync( if (stream is not null) { // Let the stream consume the stream frame data. - isDataConsumed = await stream.ReceivedStreamFrameAsync( + isDataConsumed = await stream.ReceivedDataFrameAsync( size, endStream, cancellationToken).ConfigureAwait(false); diff --git a/src/IceRpc/Transports/Slic/Internal/SlicPipeReader.cs b/src/IceRpc/Transports/Slic/Internal/SlicPipeReader.cs index 2f18d9d596..4fd5fb5798 100644 --- a/src/IceRpc/Transports/Slic/Internal/SlicPipeReader.cs +++ b/src/IceRpc/Transports/Slic/Internal/SlicPipeReader.cs @@ -10,7 +10,7 @@ namespace IceRpc.Transports.Slic.Internal; // The SlicPipeReader doesn't override ReadAtLeastAsyncCore or CopyToAsync methods because: // - we can't forward the calls to the internal pipe reader since reading relies on the AdvanceTo implementation to send -// the StreamConsumed frame once the data is examined, +// the StreamWindowUpdate frame once the data is examined, // - the default implementation can't be much optimized. internal class SlicPipeReader : PipeReader { @@ -19,12 +19,11 @@ internal class SlicPipeReader : PipeReader private long _lastExaminedOffset; private readonly Pipe _pipe; private ReadResult _readResult; - private int _receiveCredit; - private readonly int _resumeThreshold; // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex // locking. private int _state; private readonly SlicStream _stream; + private int _windowSize; public override void AdvanceTo(SequencePosition consumed) => AdvanceTo(consumed, consumed); @@ -42,10 +41,10 @@ public override void AdvanceTo(SequencePosition consumed, SequencePosition exami // If the number of examined bytes is superior to the resume threshold notifies the sender it's safe to send // additional data. - if (_examined >= _resumeThreshold) + if (_examined >= _stream.WindowUpdateThreshold) { - Interlocked.Add(ref _receiveCredit, _examined); - _stream.WriteStreamConsumedFrame(_examined); + Interlocked.Add(ref _windowSize, _examined); + _stream.WriteStreamWindowUpdateFrame(_examined); _examined = 0; } @@ -134,8 +133,7 @@ public override bool TryRead(out ReadResult result) internal SlicPipeReader(SlicStream stream, SlicConnection connection) { _stream = stream; - _resumeThreshold = connection.ResumeWriterThreshold; - _receiveCredit = connection.PauseWriterThreshold; + _windowSize = connection.InitialStreamWindowSize; // We keep the default readerScheduler (ThreadPool) because the _pipe.Writer.FlushAsync executes in the // "read loop task" and we don't want this task to continue into application code. The writerScheduler @@ -172,7 +170,7 @@ internal void CompleteReads(Exception? exception) /// reader on its internal pipe. /// if the data was consumed; otherwise, if the reader was /// completed by the application. - internal async ValueTask ReceivedStreamFrameAsync( + internal async ValueTask ReceivedDataFrameAsync( int dataSize, bool endStream, CancellationToken cancellationToken) @@ -187,7 +185,7 @@ internal async ValueTask ReceivedStreamFrameAsync( if (!_state.TrySetFlag(State.PipeWriterInUse)) { throw new InvalidOperationException( - $"The {nameof(ReceivedStreamFrameAsync)} operation is not thread safe."); + $"The {nameof(ReceivedDataFrameAsync)} operation is not thread safe."); } try @@ -197,8 +195,8 @@ internal async ValueTask ReceivedStreamFrameAsync( return false; // No bytes consumed because the application completed the stream input. } - int newCredit = Interlocked.Add(ref _receiveCredit, -dataSize); - if (newCredit < 0) + int newWindowSize = Interlocked.Add(ref _windowSize, -dataSize); + if (newWindowSize < 0) { throw new IceRpcException( IceRpcError.IceRpcError, @@ -275,7 +273,7 @@ private void ThrowIfCompleted() /// The state enumeration is used to ensure the reader is not used after it's completed and to ensure that /// the internal pipe writer isn't completed concurrently when it's being used by . + /// cref="ReceivedDataFrameAsync" />. private enum State : int { /// was called on this Slic pipe reader. diff --git a/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs b/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs index 6add56a758..e331588031 100644 --- a/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs +++ b/src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs @@ -7,7 +7,8 @@ namespace IceRpc.Transports.Slic.Internal; -#pragma warning disable CA1001 // Type owns disposable field(s) '_completeWritesCts' but is not disposable +// Type owns disposable field(s) '_completeWritesCts' and '_sendCreditSemaphore' but is not disposable +#pragma warning disable CA1001 internal class SlicPipeWriter : ReadOnlySequencePipeWriter #pragma warning restore CA1001 { @@ -17,8 +18,8 @@ internal class SlicPipeWriter : ReadOnlySequencePipeWriter private readonly CancellationTokenSource _completeWritesCts = new(); private Exception? _exception; private bool _isCompleted; + private volatile int _peerWindowSize = SlicTransportOptions.MaxWindowSize; private readonly Pipe _pipe; - private volatile int _sendCredit = int.MaxValue; // The semaphore is used when flow control is enabled to wait for additional send credit to be available. private readonly SemaphoreSlim _sendCreditSemaphore = new(1, 1); private readonly SlicStream _stream; @@ -54,7 +55,12 @@ public override void Complete(Exception? exception = null) _pipe.Writer.Complete(); _pipe.Reader.Complete(); - _sendCreditSemaphore.Dispose(); + + // Don't dispose the semaphore. It's not needed and we don't want to have to catch ObjectDisposedException + // from AdjustPeerWindowSize if a StreamWindowUpdate is received after the application completed the stream + // output. An alternative would be to add a lock but it's a bit overkill given than disposing the semaphore + // is only useful when using SemaphoreSlim.AvailableWaitHandle. + // _sendCreditSemaphore.Dispose(); } } @@ -146,7 +152,7 @@ public override async ValueTask WriteAsync( internal SlicPipeWriter(SlicStream stream, SlicConnection connection) { _stream = stream; - _sendCredit = connection.PeerPauseWriterThreshold; + _peerWindowSize = connection.PeerInitialStreamWindowSize; // Create a pipe that never pauses on flush or write. The SlicePipeWriter will pause the flush or write if // the Slic flow control doesn't permit sending more data. @@ -159,14 +165,17 @@ internal SlicPipeWriter(SlicStream stream, SlicConnection connection) useSynchronizationContext: false)); } + /// Acquires send credit. + /// A cancellation token that receives the cancellation requests. + /// The available send credit. + /// The send credit matches the size of the peer's flow-control window. internal async ValueTask AcquireSendCreditAsync(CancellationToken cancellationToken) { // Acquire the semaphore to ensure flow control allows sending additional data. It's important to acquire the - // semaphore before checking _sendCredit. The semaphore acquisition will block if we can't send additional data - // (_sendCredit == 0). Acquiring the semaphore ensures that we are allowed to send additional data and - // _sendCredit can be used to figure out the size of the next packet to send. + // semaphore before checking the peer window size. The semaphore acquisition will block if we can't send + // additional data (_peerWindowSize <= 0). await _sendCreditSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - return _sendCredit; + return _peerWindowSize; } /// Complete writes. @@ -182,35 +191,38 @@ internal void CompleteWrites(Exception? exception) /// The size of the stream frame. internal void ConsumedSendCredit(int size) { - // Decrease the size of remaining data that we are allowed to send. If all the credit is consumed, _sendCredit - // will be 0 and we don't release the semaphore to prevent further sends. The semaphore will be released once - // the stream receives a StreamConsumed frame. - int sendCredit = Interlocked.Add(ref _sendCredit, -size); - if (sendCredit > 0) + Debug.Assert(_sendCreditSemaphore.CurrentCount == 0); // Can only be called with the semaphore acquired. + + // Release the semaphore if the peer's window size is still superior to 0 + int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, -size); + if (newPeerWindowSize > 0) { _sendCreditSemaphore.Release(); } - Debug.Assert(sendCredit >= 0); } - /// Notifies the writer of the amount of data consumed by peer. - /// The amount of data consumed by the peer. - internal int ReceivedConsumedFrame(int size) + /// Notifies the writer of the reception of a frame. + /// The window size increment. + internal void ReceivedWindowUpdateFrame(int size) { - int newValue = Interlocked.Add(ref _sendCredit, size); - if (newValue == size) + Debug.Assert(size > 0); + + int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, size); + if (newPeerWindowSize > SlicTransportOptions.MaxWindowSize) { - try - { - Debug.Assert(_sendCreditSemaphore.CurrentCount == 0); - _sendCreditSemaphore.Release(); - } - catch (ObjectDisposedException) - { - // Expected if the writer has been completed. - Debug.Assert(_isCompleted); - } + throw new IceRpcException( + IceRpcError.IceRpcError, + $"The window update is trying to increase the window size to a value larger than allowed."); + } + + int previousPeerWindowSize = newPeerWindowSize - size; + + // A zero peer window size indicates that the last write consumed all the send credit and as a result didn't + // release the semaphore. We can now release the semaphore to allow another write to send data. + if (previousPeerWindowSize == 0) + { + Debug.Assert(_sendCreditSemaphore.CurrentCount == 0); + _sendCreditSemaphore.Release(); } - return newValue; } } diff --git a/src/IceRpc/Transports/Slic/Internal/SlicStream.cs b/src/IceRpc/Transports/Slic/Internal/SlicStream.cs index 29a427f7d7..f8c5ff8007 100644 --- a/src/IceRpc/Transports/Slic/Internal/SlicStream.cs +++ b/src/IceRpc/Transports/Slic/Internal/SlicStream.cs @@ -48,6 +48,8 @@ public ulong Id public Task WritesClosed => _writesClosedTcs.Task; + internal int WindowUpdateThreshold => _connection.StreamWindowUpdateThreshold; + private bool _closeReadsOnWritesClosure; private readonly SlicConnection _connection; private ulong _id = ulong.MaxValue; @@ -93,10 +95,13 @@ internal SlicStream(SlicConnection connection, bool bidirectional, bool remote) } } - /// Acquires send credit. This method should be called to ensure credit is available to send a stream - /// frame. If no send credit is available, it will block until send credit is available. + /// Acquires send credit. /// A cancellation token that receives the cancellation requests. /// The available send credit. + /// This method should be called before sending a or frame to ensure enough send credit is available. If no send credit is available, + /// it will block until send credit is available. The send credit matches the size of the peer's flow-control + /// window. internal ValueTask AcquireSendCreditAsync(CancellationToken cancellationToken) => _outputPipeWriter!.AcquireSendCreditAsync(cancellationToken); @@ -284,7 +289,8 @@ await _connection.WriteStreamFrameAsync( } } - /// Notifies the stream of the amount of data consumed by the connection to send a stream frame. + /// Notifies the stream of the amount of data consumed by the connection to send a or frame. /// The size of the stream frame. internal void ConsumedSendCredit(int size) => _outputPipeWriter!.ConsumedSendCredit(size); @@ -298,19 +304,18 @@ internal ValueTask FillBufferWriterAsync( CancellationToken cancellationToken) => _connection.FillBufferWriterAsync(bufferWriter, byteCount, cancellationToken); - /// Notifies the stream of the reception of a frame. - /// The body of the frame. - internal void ReceivedConsumedFrame(StreamConsumedBody frame) + /// Notifies the stream of the reception of a or frame. + /// The size of the data carried by the stream frame. + /// if the received stream frame is the frame; otherwise, . + /// A cancellation token that receives the cancellation requests. + internal ValueTask ReceivedDataFrameAsync(int size, bool endStream, CancellationToken cancellationToken) { - int newSendCredit = _outputPipeWriter!.ReceivedConsumedFrame((int)frame.Size); - - // Ensure the peer is not trying to increase the credit to a value which is larger than what it is allowed to. - if (newSendCredit > _connection.PeerPauseWriterThreshold) - { - throw new IceRpcException( - IceRpcError.IceRpcError, - "The consumed frame size is trying to increase the credit to a value larger than allowed."); - } + Debug.Assert(_inputPipeReader is not null); + return _state.HasFlag(State.ReadsClosed) ? + new(false) : + _inputPipeReader.ReceivedDataFrameAsync(size, endStream, cancellationToken); } /// Notifies the stream of the reception of a frame. @@ -320,18 +325,17 @@ internal void ReceivedReadsClosedFrame() _outputPipeWriter?.CompleteWrites(exception: null); } - /// Notifies the stream of the reception of a or frame. - /// The size of the data carried by the stream frame. - /// if the received stream frame is the frame; otherwise, . - /// A cancellation token that receives the cancellation requests. - internal ValueTask ReceivedStreamFrameAsync(int size, bool endStream, CancellationToken cancellationToken) + /// Notifies the stream of the reception of a frame. + /// The body of the frame. + internal void ReceivedWindowUpdateFrame(StreamWindowUpdateBody frame) { - Debug.Assert(_inputPipeReader is not null); - return _state.HasFlag(State.ReadsClosed) ? - new(false) : - _inputPipeReader.ReceivedStreamFrameAsync(size, endStream, cancellationToken); + if (frame.WindowSizeIncrement > SlicTransportOptions.MaxWindowSize) + { + throw new IceRpcException( + IceRpcError.IceRpcError, + $"The window update is trying to increase the window size to a value larger than allowed."); + } + _outputPipeWriter!.ReceivedWindowUpdateFrame((int)frame.WindowSizeIncrement); } /// Notifies the stream of the reception of a frame. @@ -343,21 +347,21 @@ internal void ReceivedWritesClosedFrame() _inputPipeReader?.CompleteReads(new IceRpcException(IceRpcError.TruncatedData)); } - /// Writes a frame on the connection. + /// Writes a frame on the connection. /// The amount of data consumed by the application on the stream . - internal void WriteStreamConsumedFrame(int size) + internal void WriteStreamWindowUpdateFrame(int size) { - _ = WriteStreamConsumedFrame(); + _ = WriteStreamWindowUpdateFrame(); - async Task WriteStreamConsumedFrame() + async Task WriteStreamWindowUpdateFrame() { try { // Send the stream consumed frame. await _connection.WriteStreamFrameAsync( stream: this, - FrameType.StreamConsumed, - new StreamConsumedBody((ulong)size).Encode, + FrameType.StreamWindowUpdate, + new StreamWindowUpdateBody((ulong)size).Encode, writeReadsClosedFrame: false).ConfigureAwait(false); } catch (IceRpcException) @@ -366,7 +370,7 @@ await _connection.WriteStreamFrameAsync( } catch (Exception exception) { - Debug.Fail($"Writing of the StreamConsumed frame failed due to an unhandled exception: {exception}"); + Debug.Fail($"Writing of the StreamWindowUpdate frame failed due to an unhandled exception: {exception}"); throw; } } diff --git a/src/IceRpc/Transports/Slic/SlicTransportOptions.cs b/src/IceRpc/Transports/Slic/SlicTransportOptions.cs index 69835e19db..6bb2e9b2c4 100644 --- a/src/IceRpc/Transports/Slic/SlicTransportOptions.cs +++ b/src/IceRpc/Transports/Slic/SlicTransportOptions.cs @@ -28,34 +28,30 @@ public int PacketMaxSize throw new ArgumentException($"The {nameof(PacketMaxSize)} value cannot be less than 1KB.", nameof(value)); } - /// Gets or sets the number of bytes when writes on a Slic stream starts blocking. - /// The pause writer threshold in bytes. It can't be less than 1 KB. Defaults to 64 KB. - public int PauseWriterThreshold + /// Gets or sets the initial stream window size. It defines the initial size of the stream receive buffer + /// for data that has not been consumed yet by the application. When this buffer is full the sender should stop + /// sending additional data. + /// The initial window size in bytes. It can't be less than 1 KB. Defaults to 64 KB. + public int InitialStreamWindowSize { - get => _pauseWriterThreshold; - set => _pauseWriterThreshold = value >= 1024 ? value : + get => _initialStreamWindowSize; + set => _initialStreamWindowSize = + value < 1024 ? throw new ArgumentException( - $"The {nameof(PauseWriterThreshold)} value cannot be less than 1KB.", - nameof(value)); - } - - /// Gets or sets the number of bytes when writes on a Slic stream stops blocking. - /// The resume writer threshold in bytes. It can't be less than 1 KB and greater than . Defaults to 32 KB. - public int ResumeWriterThreshold - { - get => _resumeWriterThreshold; - set => _resumeWriterThreshold = - value < 1024 ? throw new ArgumentException( - $"The {nameof(ResumeWriterThreshold)} value cannot be less than 1KB.", nameof(value)) : - value > _pauseWriterThreshold ? throw new ArgumentException( - $"The {nameof(ResumeWriterThreshold)} value cannot be greater than the {nameof(PauseWriterThreshold)} value.", + $"The {nameof(InitialStreamWindowSize)} value cannot be less than 1 KB.", + nameof(value)) : + value > MaxWindowSize ? + throw new ArgumentException( + $"The {nameof(InitialStreamWindowSize)} value cannot be larger than {MaxWindowSize}.", nameof(value)) : value; } + // We use the HTTP/2 maximum window size (2GB). + internal const int MaxWindowSize = int.MaxValue; + private TimeSpan _idleTimeout = TimeSpan.FromSeconds(30); private int _packetMaxSize = 32768; - private int _pauseWriterThreshold = 65536; - private int _resumeWriterThreshold = 32768; + // The default specified in the HTTP/2 specification. + private int _initialStreamWindowSize = 65_536; } diff --git a/tests/IceRpc.Tests/Transports/Slic/SlicTransportTests.cs b/tests/IceRpc.Tests/Transports/Slic/SlicTransportTests.cs index 3ac333a273..40498e4701 100644 --- a/tests/IceRpc.Tests/Transports/Slic/SlicTransportTests.cs +++ b/tests/IceRpc.Tests/Transports/Slic/SlicTransportTests.cs @@ -603,14 +603,12 @@ public async Task Stream_peer_options_are_set_after_connect() IServiceCollection services = new ServiceCollection().AddSlicTest(); services.AddOptions("server").Configure(options => { - options.PauseWriterThreshold = 6893; - options.ResumeWriterThreshold = 2000; + options.InitialStreamWindowSize = 6893; options.PacketMaxSize = 2098; }); services.AddOptions("client").Configure(options => { - options.PauseWriterThreshold = 2405; - options.ResumeWriterThreshold = 2000; + options.InitialStreamWindowSize = 2405; options.PacketMaxSize = 4567; }); await using ServiceProvider provider = services.BuildServiceProvider(validateScopes: true); @@ -624,8 +622,8 @@ public async Task Stream_peer_options_are_set_after_connect() var serverConnection = (SlicConnection)sut.Server; var clientConnection = (SlicConnection)sut.Client; - Assert.That(serverConnection.PeerPauseWriterThreshold, Is.EqualTo(2405)); - Assert.That(clientConnection.PeerPauseWriterThreshold, Is.EqualTo(6893)); + Assert.That(serverConnection.PeerInitialStreamWindowSize, Is.EqualTo(2405)); + Assert.That(clientConnection.PeerInitialStreamWindowSize, Is.EqualTo(6893)); Assert.That(serverConnection.PeerPacketMaxSize, Is.EqualTo(4567)); Assert.That(clientConnection.PeerPacketMaxSize, Is.EqualTo(2098)); } @@ -633,15 +631,15 @@ public async Task Stream_peer_options_are_set_after_connect() [TestCase(1024 * 32)] [TestCase(1024 * 512)] [TestCase(1024 * 1024)] - public async Task Stream_write_blocks_after_consuming_the_send_credit(int pauseThreshold) + public async Task Stream_write_blocks_after_consuming_the_send_credit(int windowSize) { // Arrange IServiceCollection services = new ServiceCollection().AddSlicTest(); services.AddOptions("server").Configure( - options => options.PauseWriterThreshold = pauseThreshold); + options => options.InitialStreamWindowSize = windowSize); await using ServiceProvider provider = services.BuildServiceProvider(validateScopes: true); - byte[] payload = new byte[pauseThreshold - 1]; + byte[] payload = new byte[windowSize - 1]; var sut = provider.GetRequiredService(); await sut.AcceptAndConnectAsync(); @@ -657,7 +655,7 @@ public async Task Stream_write_blocks_after_consuming_the_send_credit(int pauseT Assert.That(writeTask.IsCompleted, Is.False); // Make sure the writeTask completes before completing the stream output. - ReadResult readResult = await streams.Remote.Input.ReadAtLeastAsync(pauseThreshold - 1); + ReadResult readResult = await streams.Remote.Input.ReadAtLeastAsync(windowSize - 1); Assert.That(readResult.IsCanceled, Is.False); streams.Remote.Input.AdvanceTo(readResult.Buffer.End); await writeTask; @@ -666,13 +664,13 @@ public async Task Stream_write_blocks_after_consuming_the_send_credit(int pauseT [TestCase(32 * 1024)] [TestCase(512 * 1024)] [TestCase(1024 * 1024)] - public async Task Stream_write_blocking_does_not_affect_concurrent_streams(int pauseThreshold) + public async Task Stream_write_blocking_does_not_affect_concurrent_streams(int windowSize) { // Arrange - byte[] payload = new byte[pauseThreshold - 1]; + byte[] payload = new byte[windowSize - 1]; IServiceCollection services = new ServiceCollection().AddSlicTest(); services.AddOptions("server").Configure( - options => options.PauseWriterThreshold = pauseThreshold); + options => options.InitialStreamWindowSize = windowSize); await using ServiceProvider provider = services.BuildServiceProvider(validateScopes: true); var sut = provider.GetRequiredService(); @@ -690,14 +688,14 @@ public async Task Stream_write_blocking_does_not_affect_concurrent_streams(int p // Assert Assert.That(streams2.Local.Id, Is.EqualTo(streams2.Remote.Id)); - ReadResult readResult = await streams2.Remote.Input.ReadAtLeastAsync(pauseThreshold - 1); + ReadResult readResult = await streams2.Remote.Input.ReadAtLeastAsync(windowSize - 1); Assert.That(readResult.IsCanceled, Is.False); streams2.Remote.Input.AdvanceTo(readResult.Buffer.End); await Task.Delay(TimeSpan.FromMilliseconds(50)); Assert.That(writeTask.IsCompleted, Is.False); // Make sure the writeTask completes before completing the stream output. - readResult = await streams1.Remote.Input.ReadAtLeastAsync(pauseThreshold - 1); + readResult = await streams1.Remote.Input.ReadAtLeastAsync(windowSize - 1); Assert.That(readResult.IsCanceled, Is.False); streams1.Remote.Input.AdvanceTo(readResult.Buffer.End); await writeTask; @@ -735,20 +733,15 @@ public async Task Stream_write_cancellation() duplexClientConnection.Operations.Hold = DuplexTransportOperations.None; } - [TestCase(64 * 1024, 32 * 1024)] - [TestCase(1024 * 1024, 512 * 1024)] - [TestCase(2048 * 1024, 512 * 1024)] - public async Task Write_resumes_after_reaching_the_resume_writer_threshold( - int pauseThreshold, - int resumeThreshold) + [Test] + public async Task Write_resumes_after_reaching_the_resume_writer_threshold() { + int windowSize = 32 * 1024; + // Arrange IServiceCollection services = new ServiceCollection().AddSlicTest(); - services.AddOptions("server").Configure(options => - { - options.PauseWriterThreshold = pauseThreshold; - options.ResumeWriterThreshold = resumeThreshold; - }); + services.AddOptions("server").Configure( + options => options.InitialStreamWindowSize = windowSize); await using ServiceProvider provider = services.BuildServiceProvider(validateScopes: true); var sut = provider.GetRequiredService(); @@ -756,11 +749,11 @@ public async Task Write_resumes_after_reaching_the_resume_writer_threshold( using var streams = await sut.CreateAndAcceptStreamAsync(); - ValueTask writeTask = streams.Local.Output.WriteAsync(new byte[pauseThreshold], default); - ReadResult readResult = await streams.Remote.Input.ReadAtLeastAsync(pauseThreshold - resumeThreshold, default); + ValueTask writeTask = streams.Local.Output.WriteAsync(new byte[windowSize + 1024], default); + ReadResult readResult = await streams.Remote.Input.ReadAtLeastAsync(windowSize, default); // Act - streams.Remote.Input.AdvanceTo(readResult.Buffer.GetPosition(pauseThreshold - resumeThreshold)); + streams.Remote.Input.AdvanceTo(readResult.Buffer.GetPosition(windowSize)); // Assert Assert.That(async () => await writeTask, Throws.Nothing);