Skip to content

Commit

Permalink
Support for StreamReceiveWindowSize, fixes #3316 (#3371)
Browse files Browse the repository at this point in the history
  • Loading branch information
bentoi authored Jun 16, 2023
1 parent cd77a90 commit c7f9766
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 157 deletions.
67 changes: 37 additions & 30 deletions src/IceRpc/Transports/Slic/Internal/SlicConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,28 @@ internal class SlicConnection : IMultiplexedConnection
/// <summary>Gets the minimum size of the segment requested from <see cref="Pool" />.</summary>
internal int MinSegmentSize { get; }

internal int PauseWriterThreshold { get; }

/// <summary>Gets the maximum size of packets accepted by the peer.</summary>
/// <summary>Gets the maximum size of packets accepted by the peer. This property is set to the <see
/// cref="ParameterKey.PacketMaxSize"/> value carried by the <see cref="FrameType.Initialize" /> frame.</summary>
internal int PeerPacketMaxSize { get; private set; }

// TODO: replace with a window size property
internal int PeerPauseWriterThreshold { get; private set; }
/// <summary>Gets the peer's initial stream window size. This property is set to the <see
/// cref="ParameterKey.InitialStreamWindowSize"/> value carried by the <see cref="FrameType.Initialize" />
/// frame.</summary>
internal int PeerInitialStreamWindowSize { get; private set; }

/// <summary>Gets the <see cref="MemoryPool{T}" /> used for obtaining memory buffers.</summary>
internal MemoryPool<byte> Pool { get; }

// TODO: replace with a window size property
internal int ResumeWriterThreshold { get; }
/// <summary>Gets the initial stream window size.</summary>
internal int InitialStreamWindowSize { get; }

/// <summary>Gets the window update threshold. When the window size is increased and this threshold reached, a <see
/// cref="FrameType.StreamWindowUpdate" /> frame is sent.</summary>
internal int StreamWindowUpdateThreshold => InitialStreamWindowSize / StreamWindowUpdateRatio;

// The ratio used to compute the StreamWindowUpdateThreshold. For now, the stream window update is sent when the
// window size grows over InitialStreamWindowSize / StreamWindowUpdateRatio.
private const int StreamWindowUpdateRatio = 2;

private readonly Channel<IMultiplexedStream> _acceptStreamChannel;
private int _bidirectionalStreamCount;
Expand Down Expand Up @@ -560,8 +569,7 @@ internal SlicConnection(
_maxBidirectionalStreams = options.MaxBidirectionalStreams;
_maxUnidirectionalStreams = options.MaxUnidirectionalStreams;

PauseWriterThreshold = slicOptions.PauseWriterThreshold;
ResumeWriterThreshold = slicOptions.ResumeWriterThreshold;
InitialStreamWindowSize = slicOptions.InitialStreamWindowSize;
_localIdleTimeout = slicOptions.IdleTimeout;
_packetMaxSize = slicOptions.PacketMaxSize;
_acceptStreamChannel = Channel.CreateUnbounded<IMultiplexedStream>(new UnboundedChannelOptions
Expand All @@ -585,7 +593,6 @@ internal SlicConnection(
// Initially set the peer packet max size to the local max size to ensure we can receive the first initialize
// frame.
PeerPacketMaxSize = _packetMaxSize;
PeerPauseWriterThreshold = PauseWriterThreshold;

// We use the same stream ID numbering scheme as Quic.
if (IsServer)
Expand Down Expand Up @@ -792,8 +799,8 @@ internal async ValueTask<FlushResult> WriteStreamDataFrameAsync(
}

// Notify the stream that we're consuming sendSize credit. It's important to call this before sending
// the stream frame to avoid race conditions where the StreamConsumed frame could be received before the
// send credit was updated.
// the stream frame to avoid race conditions where the StreamWindowUpdate frame could be received before
// the send credit was updated.
if (sendCredit > 0)
{
stream.ConsumedSendCredit((int)(sendSource1.Length + sendSource2.Length));
Expand Down Expand Up @@ -900,7 +907,7 @@ private void AddStream(ulong id, SlicStream stream)
private void DecodeParameters(IDictionary<ParameterKey, IList<byte>> parameters)
{
int? peerPacketMaxSize = null;
int? peerPauseWriterThreshold = null;
int? peerInitialStreamWindowSize = null;
foreach ((ParameterKey key, IList<byte> buffer) in parameters)
{
switch (key)
Expand Down Expand Up @@ -939,17 +946,17 @@ private void DecodeParameters(IDictionary<ParameterKey, IList<byte>> parameters)
if (peerPacketMaxSize < 1024)
{
throw new InvalidDataException(
"The PacketMaxSize Slic connection parameter is invalid, it must be greater than 1KB.");
"The PacketMaxSize connection parameter is invalid, it must be greater than 1KB.");
}
break;
}
case ParameterKey.PauseWriterThreshold:
case ParameterKey.InitialStreamWindowSize:
{
peerPauseWriterThreshold = DecodeParamValue(buffer);
if (peerPauseWriterThreshold < 1024)
peerInitialStreamWindowSize = DecodeParamValue(buffer);
if (peerInitialStreamWindowSize < 1024)
{
throw new InvalidDataException(
"The PauseWriterThreshold Slic connection parameter is invalid, it must be greater than 1KB.");
"The InitialStreamWindowSize connection parameter is invalid, it must be greater than 1KB.");
}
break;
}
Expand All @@ -960,21 +967,21 @@ private void DecodeParameters(IDictionary<ParameterKey, IList<byte>> parameters)
if (peerPacketMaxSize is null)
{
throw new InvalidDataException(
"The peer didn't send the required PacketMaxSize Slic connection parameter.");
"The peer didn't send the required PacketMaxSize connection parameter.");
}
else
{
PeerPacketMaxSize = peerPacketMaxSize.Value;
}

if (peerPauseWriterThreshold is null)
if (peerInitialStreamWindowSize is null)
{
throw new InvalidDataException(
"The peer didn't send the required PauseWriterThreshold Slic connection parameter.");
"The peer didn't send the required InitialStreamWindowSize connection parameter.");
}
else
{
PeerPauseWriterThreshold = peerPauseWriterThreshold.Value;
PeerInitialStreamWindowSize = peerInitialStreamWindowSize.Value;
}

// all parameter values are currently integers in the range 0..Int32Max encoded as varuint62.
Expand All @@ -1001,7 +1008,7 @@ private Dictionary<ParameterKey, IList<byte>> EncodeParameters()
{
// Required parameters.
EncodeParameter(ParameterKey.PacketMaxSize, (ulong)_packetMaxSize),
EncodeParameter(ParameterKey.PauseWriterThreshold, (ulong)PauseWriterThreshold)
EncodeParameter(ParameterKey.InitialStreamWindowSize, (ulong)InitialStreamWindowSize)
};

// Optional parameters.
Expand Down Expand Up @@ -1055,9 +1062,9 @@ private Task ReadFrameAsync(FrameType type, int size, ulong? streamId, Cancellat
{
return ReadStreamDataFrameAsync(type, size, streamId!.Value, cancellationToken);
}
case FrameType.StreamConsumed:
case FrameType.StreamWindowUpdate:
{
return ReadStreamConsumedFrameAsync(size, streamId!.Value, cancellationToken);
return ReadStreamWindowUpdateFrameAsync(size, streamId!.Value, cancellationToken);
}
case FrameType.StreamReadsClosed:
{
Expand Down Expand Up @@ -1167,15 +1174,15 @@ async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken)
}
}

async Task ReadStreamConsumedFrameAsync(int size, ulong streamId, CancellationToken cancellationToken)
async Task ReadStreamWindowUpdateFrameAsync(int size, ulong streamId, CancellationToken cancellationToken)
{
StreamConsumedBody frame = await ReadFrameBodyAsync(
StreamWindowUpdateBody frame = await ReadFrameBodyAsync(
size,
(ref SliceDecoder decoder) => new StreamConsumedBody(ref decoder),
(ref SliceDecoder decoder) => new StreamWindowUpdateBody(ref decoder),
cancellationToken).ConfigureAwait(false);
if (_streams.TryGetValue(streamId, out SlicStream? stream))
{
stream.ReceivedConsumedFrame(frame);
stream.ReceivedWindowUpdateFrame(frame);
}
}

Expand Down Expand Up @@ -1435,7 +1442,7 @@ await _acceptStreamChannel.Writer.WriteAsync(
if (stream is not null)
{
// Let the stream consume the stream frame data.
isDataConsumed = await stream.ReceivedStreamFrameAsync(
isDataConsumed = await stream.ReceivedDataFrameAsync(
size,
endStream,
cancellationToken).ConfigureAwait(false);
Expand Down
24 changes: 11 additions & 13 deletions src/IceRpc/Transports/Slic/Internal/SlicPipeReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace IceRpc.Transports.Slic.Internal;

// The SlicPipeReader doesn't override ReadAtLeastAsyncCore or CopyToAsync methods because:
// - we can't forward the calls to the internal pipe reader since reading relies on the AdvanceTo implementation to send
// the StreamConsumed frame once the data is examined,
// the StreamWindowUpdate frame once the data is examined,
// - the default implementation can't be much optimized.
internal class SlicPipeReader : PipeReader
{
Expand All @@ -19,12 +19,11 @@ internal class SlicPipeReader : PipeReader
private long _lastExaminedOffset;
private readonly Pipe _pipe;
private ReadResult _readResult;
private int _receiveCredit;
private readonly int _resumeThreshold;
// FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
// locking.
private int _state;
private readonly SlicStream _stream;
private int _windowSize;

public override void AdvanceTo(SequencePosition consumed) => AdvanceTo(consumed, consumed);

Expand All @@ -42,10 +41,10 @@ public override void AdvanceTo(SequencePosition consumed, SequencePosition exami

// If the number of examined bytes is superior to the resume threshold notifies the sender it's safe to send
// additional data.
if (_examined >= _resumeThreshold)
if (_examined >= _stream.WindowUpdateThreshold)
{
Interlocked.Add(ref _receiveCredit, _examined);
_stream.WriteStreamConsumedFrame(_examined);
Interlocked.Add(ref _windowSize, _examined);
_stream.WriteStreamWindowUpdateFrame(_examined);
_examined = 0;
}

Expand Down Expand Up @@ -134,8 +133,7 @@ public override bool TryRead(out ReadResult result)
internal SlicPipeReader(SlicStream stream, SlicConnection connection)
{
_stream = stream;
_resumeThreshold = connection.ResumeWriterThreshold;
_receiveCredit = connection.PauseWriterThreshold;
_windowSize = connection.InitialStreamWindowSize;

// We keep the default readerScheduler (ThreadPool) because the _pipe.Writer.FlushAsync executes in the
// "read loop task" and we don't want this task to continue into application code. The writerScheduler
Expand Down Expand Up @@ -172,7 +170,7 @@ internal void CompleteReads(Exception? exception)
/// reader on its internal pipe.</summary>
/// <returns><see langword="true" /> if the data was consumed; otherwise, <see langword="false"/> if the reader was
/// completed by the application.</returns>
internal async ValueTask<bool> ReceivedStreamFrameAsync(
internal async ValueTask<bool> ReceivedDataFrameAsync(
int dataSize,
bool endStream,
CancellationToken cancellationToken)
Expand All @@ -187,7 +185,7 @@ internal async ValueTask<bool> ReceivedStreamFrameAsync(
if (!_state.TrySetFlag(State.PipeWriterInUse))
{
throw new InvalidOperationException(
$"The {nameof(ReceivedStreamFrameAsync)} operation is not thread safe.");
$"The {nameof(ReceivedDataFrameAsync)} operation is not thread safe.");
}

try
Expand All @@ -197,8 +195,8 @@ internal async ValueTask<bool> ReceivedStreamFrameAsync(
return false; // No bytes consumed because the application completed the stream input.
}

int newCredit = Interlocked.Add(ref _receiveCredit, -dataSize);
if (newCredit < 0)
int newWindowSize = Interlocked.Add(ref _windowSize, -dataSize);
if (newWindowSize < 0)
{
throw new IceRpcException(
IceRpcError.IceRpcError,
Expand Down Expand Up @@ -275,7 +273,7 @@ private void ThrowIfCompleted()

/// <summary>The state enumeration is used to ensure the reader is not used after it's completed and to ensure that
/// the internal pipe writer isn't completed concurrently when it's being used by <see
/// cref="ReceivedStreamFrameAsync" />.</summary>
/// cref="ReceivedDataFrameAsync" />.</summary>
private enum State : int
{
/// <summary><see cref="Complete" /> was called on this Slic pipe reader.</summary>
Expand Down
72 changes: 42 additions & 30 deletions src/IceRpc/Transports/Slic/Internal/SlicPipeWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

namespace IceRpc.Transports.Slic.Internal;

#pragma warning disable CA1001 // Type owns disposable field(s) '_completeWritesCts' but is not disposable
// Type owns disposable field(s) '_completeWritesCts' and '_sendCreditSemaphore' but is not disposable
#pragma warning disable CA1001
internal class SlicPipeWriter : ReadOnlySequencePipeWriter
#pragma warning restore CA1001
{
Expand All @@ -17,8 +18,8 @@ internal class SlicPipeWriter : ReadOnlySequencePipeWriter
private readonly CancellationTokenSource _completeWritesCts = new();
private Exception? _exception;
private bool _isCompleted;
private volatile int _peerWindowSize = SlicTransportOptions.MaxWindowSize;
private readonly Pipe _pipe;
private volatile int _sendCredit = int.MaxValue;
// The semaphore is used when flow control is enabled to wait for additional send credit to be available.
private readonly SemaphoreSlim _sendCreditSemaphore = new(1, 1);
private readonly SlicStream _stream;
Expand Down Expand Up @@ -54,7 +55,12 @@ public override void Complete(Exception? exception = null)

_pipe.Writer.Complete();
_pipe.Reader.Complete();
_sendCreditSemaphore.Dispose();

// Don't dispose the semaphore. It's not needed and we don't want to have to catch ObjectDisposedException
// from AdjustPeerWindowSize if a StreamWindowUpdate is received after the application completed the stream
// output. An alternative would be to add a lock but it's a bit overkill given than disposing the semaphore
// is only useful when using SemaphoreSlim.AvailableWaitHandle.
// _sendCreditSemaphore.Dispose();
}
}

Expand Down Expand Up @@ -146,7 +152,7 @@ public override async ValueTask<FlushResult> WriteAsync(
internal SlicPipeWriter(SlicStream stream, SlicConnection connection)
{
_stream = stream;
_sendCredit = connection.PeerPauseWriterThreshold;
_peerWindowSize = connection.PeerInitialStreamWindowSize;

// Create a pipe that never pauses on flush or write. The SlicePipeWriter will pause the flush or write if
// the Slic flow control doesn't permit sending more data.
Expand All @@ -159,14 +165,17 @@ internal SlicPipeWriter(SlicStream stream, SlicConnection connection)
useSynchronizationContext: false));
}

/// <summary>Acquires send credit.</summary>
/// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
/// <returns>The available send credit.</returns>
/// <remarks>The send credit matches the size of the peer's flow-control window.</remarks>
internal async ValueTask<int> AcquireSendCreditAsync(CancellationToken cancellationToken)
{
// Acquire the semaphore to ensure flow control allows sending additional data. It's important to acquire the
// semaphore before checking _sendCredit. The semaphore acquisition will block if we can't send additional data
// (_sendCredit == 0). Acquiring the semaphore ensures that we are allowed to send additional data and
// _sendCredit can be used to figure out the size of the next packet to send.
// semaphore before checking the peer window size. The semaphore acquisition will block if we can't send
// additional data (_peerWindowSize <= 0).
await _sendCreditSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
return _sendCredit;
return _peerWindowSize;
}

/// <summary>Complete writes.</summary>
Expand All @@ -182,35 +191,38 @@ internal void CompleteWrites(Exception? exception)
/// <param name="size">The size of the stream frame.</param>
internal void ConsumedSendCredit(int size)
{
// Decrease the size of remaining data that we are allowed to send. If all the credit is consumed, _sendCredit
// will be 0 and we don't release the semaphore to prevent further sends. The semaphore will be released once
// the stream receives a StreamConsumed frame.
int sendCredit = Interlocked.Add(ref _sendCredit, -size);
if (sendCredit > 0)
Debug.Assert(_sendCreditSemaphore.CurrentCount == 0); // Can only be called with the semaphore acquired.

// Release the semaphore if the peer's window size is still superior to 0
int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, -size);
if (newPeerWindowSize > 0)
{
_sendCreditSemaphore.Release();
}
Debug.Assert(sendCredit >= 0);
}

/// <summary>Notifies the writer of the amount of data consumed by peer.</summary>
/// <param name="size">The amount of data consumed by the peer.</param>
internal int ReceivedConsumedFrame(int size)
/// <summary>Notifies the writer of the reception of a <see cref="FrameType.StreamWindowUpdate" /> frame.</summary>
/// <param name="size">The window size increment.</param>
internal void ReceivedWindowUpdateFrame(int size)
{
int newValue = Interlocked.Add(ref _sendCredit, size);
if (newValue == size)
Debug.Assert(size > 0);

int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, size);
if (newPeerWindowSize > SlicTransportOptions.MaxWindowSize)
{
try
{
Debug.Assert(_sendCreditSemaphore.CurrentCount == 0);
_sendCreditSemaphore.Release();
}
catch (ObjectDisposedException)
{
// Expected if the writer has been completed.
Debug.Assert(_isCompleted);
}
throw new IceRpcException(
IceRpcError.IceRpcError,
$"The window update is trying to increase the window size to a value larger than allowed.");
}

int previousPeerWindowSize = newPeerWindowSize - size;

// A zero peer window size indicates that the last write consumed all the send credit and as a result didn't
// release the semaphore. We can now release the semaphore to allow another write to send data.
if (previousPeerWindowSize == 0)
{
Debug.Assert(_sendCreditSemaphore.CurrentCount == 0);
_sendCreditSemaphore.Release();
}
return newValue;
}
}
Loading

0 comments on commit c7f9766

Please sign in to comment.