Skip to content

Commit

Permalink
Update idle check in C# to work with an exhausted (flow controlled) t…
Browse files Browse the repository at this point in the history
…hread pool (#2592)
  • Loading branch information
bernardnormier authored Jul 31, 2024
1 parent 765d815 commit b38df0a
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 18 deletions.
34 changes: 24 additions & 10 deletions csharp/src/Ice/ConnectionI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1434,26 +1434,40 @@ internal ConnectionI(

/// <summary>Aborts the connection with a <see cref="ConnectionAbortedException" /> if the connection is active or
/// holding.</summary>
internal void idleCheck(TimeSpan idleTimeout)
internal void idleCheck(TimeSpan idleTimeout, Action rescheduleTimer)
{
lock (this)
{
if (_state == StateActive || _state == StateHolding)
{
int idleTimeoutInSeconds = (int)idleTimeout.TotalSeconds;

if (_instance.traceLevels().network >= 1)
if (_transceiver.isWaitingToBeRead)
{
_instance.initializationData().logger.trace(
_instance.traceLevels().networkCat,
$"connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSeconds}s\n{_transceiver.toDetailedString()}");
rescheduleTimer();

if (_instance.traceLevels().network >= 3)
{
_instance.initializationData().logger.trace(
_instance.traceLevels().networkCat,
$"the idle check scheduled a new idle check in {idleTimeoutInSeconds}s because the connection is waiting to be read\n{_transceiver.toDetailedString()}");
}
}
else
{
if (_instance.traceLevels().network >= 1)
{
_instance.initializationData().logger.trace(
_instance.traceLevels().networkCat,
$"connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSeconds}s\n{_transceiver.toDetailedString()}");
}

setState(
StateClosed,
new ConnectionAbortedException(
$"Connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSeconds}s.",
closedByApplication: false));
setState(
StateClosed,
new ConnectionAbortedException(
$"Connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSeconds}s.",
closedByApplication: false));
}
}
// else nothing to do
}
Expand Down
4 changes: 3 additions & 1 deletion csharp/src/Ice/Internal/IdleTimeoutTransceiverDecorator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace Ice.Internal;

internal sealed class IdleTimeoutTransceiverDecorator : Transceiver
{
public bool isWaitingToBeRead => _decoratee.isWaitingToBeRead;

private readonly Transceiver _decoratee;
private readonly TimeSpan _idleTimeout;
private readonly System.Threading.Timer? _readTimer;
Expand Down Expand Up @@ -106,7 +108,7 @@ internal IdleTimeoutTransceiverDecorator(Transceiver decoratee, ConnectionI conn

if (enableIdleCheck)
{
_readTimer = new System.Threading.Timer(_ => connection.idleCheck(_idleTimeout));
_readTimer = new System.Threading.Timer(_ => connection.idleCheck(_idleTimeout, rescheduleReadTimer));
}

_writeTimer = new System.Threading.Timer(_ => connection.sendHeartbeat());
Expand Down
21 changes: 21 additions & 0 deletions csharp/src/Ice/Internal/StreamSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@ namespace Ice.Internal;

public sealed class StreamSocket
{
internal bool isWaitingToBeRead
{
get
{
try
{
return _pendingRead.IsSet || _fd.Available > 0;
}
catch
{
return false;
}
}
}

public StreamSocket(ProtocolInstance instance, NetworkProxy proxy, EndPoint addr, EndPoint sourceAddr)
{
_instance = instance;
Expand Down Expand Up @@ -193,6 +208,8 @@ public void finishRead(Buffer buf)
Debug.Assert(_fd != null && _readEventArgs != null);
try
{
_pendingRead.Reset();

if (_readEventArgs.SocketError != SocketError.Success)
{
throw new System.Net.Sockets.SocketException((int)_readEventArgs.SocketError);
Expand Down Expand Up @@ -347,6 +364,7 @@ public void destroy()
Debug.Assert(_readEventArgs != null && _writeEventArgs != null);
_readEventArgs.Dispose();
_writeEventArgs.Dispose();
_pendingRead.Dispose();
}

public override string ToString()
Expand Down Expand Up @@ -443,6 +461,7 @@ private void ioCompleted(object sender, SocketAsyncEventArgs e)
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
_pendingRead.Set();
_readCallback(e.UserToken);
break;
case SocketAsyncOperation.Send:
Expand Down Expand Up @@ -505,6 +524,8 @@ private int toState(int operation)
private AsyncCallback _writeCallback;
private AsyncCallback _readCallback;

private readonly ManualResetEventSlim _pendingRead = new ManualResetEventSlim(false);

private const int StateNeedConnect = 0;
private const int StateConnectPending = 1;
private const int StateProxyWrite = 2;
Expand Down
2 changes: 2 additions & 0 deletions csharp/src/Ice/Internal/TcpTransceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace Ice.Internal;

internal sealed class TcpTransceiver : Transceiver
{
public bool isWaitingToBeRead => _stream.isWaitingToBeRead;

public Socket fd()
{
return _stream.fd();
Expand Down
5 changes: 5 additions & 0 deletions csharp/src/Ice/Internal/Transceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ namespace Ice.Internal;

public interface Transceiver
{
/// <summary>
/// Checks if this transceiver is waiting to be read, typically because it has bytes readily available for reading.
/// </summary>
bool isWaitingToBeRead { get; }

Socket fd();
int initialize(Buffer readBuffer, Buffer writeBuffer, ref bool hasMoreData);
int closing(bool initiator, Ice.LocalException ex);
Expand Down
9 changes: 9 additions & 0 deletions csharp/src/Ice/Internal/UdpTransceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ namespace Ice.Internal;

internal sealed class UdpTransceiver : Transceiver
{
public bool isWaitingToBeRead
{
get
{
Debug.Fail("UdpTransceiver does not implement isWaitingToBeRead");
return false;
}
}

public Socket fd()
{
return _fd;
Expand Down
2 changes: 2 additions & 0 deletions csharp/src/Ice/Internal/WSTransceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace Ice.Internal;

internal sealed class WSTransceiver : Transceiver
{
public bool isWaitingToBeRead => _readBuffer.b.position() > _readBufferPos || _delegate.isWaitingToBeRead;

public Socket fd()
{
return _delegate.fd();
Expand Down
19 changes: 17 additions & 2 deletions csharp/src/Ice/SSL/TransceiverI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ namespace Ice.SSL;

internal sealed class TransceiverI : Ice.Internal.Transceiver
{
public bool isWaitingToBeRead => _pendingRead.IsSet || _delegate.isWaitingToBeRead;

public Socket fd() => _delegate.fd();

public int initialize(Ice.Internal.Buffer readBuffer, Ice.Internal.Buffer writeBuffer, ref bool hasMoreData)
Expand Down Expand Up @@ -86,7 +88,11 @@ public Ice.Internal.EndpointI bind()
return null;
}

public void destroy() => _delegate.destroy();
public void destroy()
{
_delegate.destroy();
_pendingRead.Dispose();
}

public int write(Ice.Internal.Buffer buf) =>
// Force caller to use async write.
Expand All @@ -109,7 +115,13 @@ public bool startRead(Ice.Internal.Buffer buf, Ice.Internal.AsyncCallback callba
try
{
_readResult = _sslStream.ReadAsync(buf.b.rawBytes(), buf.b.position(), packetSz);
_readResult.ContinueWith(task => callback(state), TaskScheduler.Default);
_readResult.ContinueWith(
task =>
{
_pendingRead.Set();
callback(state);
},
TaskScheduler.Default);
return false;
}
catch (IOException ex)
Expand Down Expand Up @@ -150,6 +162,8 @@ public void finishRead(Ice.Internal.Buffer buf)
Debug.Assert(_readResult != null);
try
{
_pendingRead.Reset();

int ret;
try
{
Expand Down Expand Up @@ -493,6 +507,7 @@ public int getRecvPacketSize(int length) =>
private bool _authenticated;
private Task _writeResult;
private Task<int> _readResult;
private readonly ManualResetEventSlim _pendingRead = new ManualResetEventSlim(false);
private int _maxSendPacketSize;
private int _maxRecvPacketSize;
private string _cipher;
Expand Down
3 changes: 3 additions & 0 deletions csharp/test/Ice/background/Transceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

internal class Transceiver : Ice.Internal.Transceiver
{
public bool isWaitingToBeRead =>
(_buffered && (_readBuffer.b.position() - _readBufferPos > 0)) || _transceiver.isWaitingToBeRead;

public Socket fd()
{
return _transceiver.fd();
Expand Down
8 changes: 5 additions & 3 deletions csharp/test/Ice/idleTimeout/AllTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ internal static async Task allTests(global::Test.TestHelper helper)

string proxyString3s = $"test: {helper.getTestEndpoint(1)}";

// TODO: this test no longer works with the thread pool fix, idle timeout implementation needs fixing.
// await testIdleCheckDoesNotAbortConnectionWhenThreadPoolIsExhausted(p, helper.getWriter());
await testIdleCheckDoesNotAbortConnectionWhenThreadPoolIsExhausted(p, helper.getWriter());
await testConnectionAbortedByIdleCheck(proxyString, communicator.getProperties(), helper.getWriter());
await testEnableDisableIdleCheck(true, proxyString3s, communicator.getProperties(), helper.getWriter());
await testEnableDisableIdleCheck(false, proxyString3s, communicator.getProperties(), helper.getWriter());
Expand All @@ -39,7 +38,10 @@ private static async Task testIdleCheckDoesNotAbortConnectionWhenThreadPoolIsExh
// Establish connection.
await p.ice_pingAsync();

await p.sleepAsync(2000); // the implementation in the server sleeps for 2,000ms
await p.sleepAsync(4000); // the implementation in the server sleeps for 2,000ms

// close connection
p.ice_getConnection().close(ConnectionClose.GracefullyWithWait);
output.WriteLine("ok");
}

Expand Down
3 changes: 3 additions & 0 deletions csharp/test/Ice/idleTimeout/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ public override async Task runAsync(string[] args)
{
var properties = createTestProperties(ref args);
properties.setProperty("Ice.Connection.IdleTimeout", "1");

// TODO: temporary work-around for IceSSL always sending messages asynchronously.
properties.setProperty("Ice.Connection.EnableIdleCheck", "0");
using var communicator = initialize(properties);
await AllTests.allTests(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ public interface Transceiver {

/**
* Checks if this transceiver is waiting to be read, typically because it has bytes readily
* available for reading. The caller must ensure the transceiver is not closed when calling this
* method.
* available for reading.
*
* @return true if this transceiver is waiting to be read, false otherwise.
*/
Expand Down

0 comments on commit b38df0a

Please sign in to comment.