diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs index 07f1aa41918..047e476e090 100644 --- a/csharp/src/Ice/ConnectionI.cs +++ b/csharp/src/Ice/ConnectionI.cs @@ -1434,7 +1434,7 @@ internal ConnectionI( /// Aborts the connection with a if the connection is active or /// holding. - internal void idleCheck(TimeSpan idleTimeout) + internal void idleCheck(TimeSpan idleTimeout, Action rescheduleTimer) { lock (this) { @@ -1442,18 +1442,32 @@ internal void idleCheck(TimeSpan idleTimeout) { int idleTimeoutInSeconds = (int)idleTimeout.TotalSeconds; - if (_instance.traceLevels().network >= 1) + if (_transceiver.isWaitingToBeRead) { - _instance.initializationData().logger.trace( - _instance.traceLevels().networkCat, - $"connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSeconds}s\n{_transceiver.toDetailedString()}"); + rescheduleTimer(); + + if (_instance.traceLevels().network >= 3) + { + _instance.initializationData().logger.trace( + _instance.traceLevels().networkCat, + $"the idle check scheduled a new idle check in {idleTimeoutInSeconds}s because the connection is waiting to be read\n{_transceiver.toDetailedString()}"); + } } + else + { + if (_instance.traceLevels().network >= 1) + { + _instance.initializationData().logger.trace( + _instance.traceLevels().networkCat, + $"connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSeconds}s\n{_transceiver.toDetailedString()}"); + } - setState( - StateClosed, - new ConnectionAbortedException( - $"Connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSeconds}s.", - closedByApplication: false)); + setState( + StateClosed, + new ConnectionAbortedException( + $"Connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSeconds}s.", + closedByApplication: false)); + } } // else nothing to do } diff --git a/csharp/src/Ice/Internal/IdleTimeoutTransceiverDecorator.cs b/csharp/src/Ice/Internal/IdleTimeoutTransceiverDecorator.cs index 62738861889..54f80d03f1d 100644 --- a/csharp/src/Ice/Internal/IdleTimeoutTransceiverDecorator.cs +++ b/csharp/src/Ice/Internal/IdleTimeoutTransceiverDecorator.cs @@ -9,6 +9,8 @@ namespace Ice.Internal; internal sealed class IdleTimeoutTransceiverDecorator : Transceiver { + public bool isWaitingToBeRead => _decoratee.isWaitingToBeRead; + private readonly Transceiver _decoratee; private readonly TimeSpan _idleTimeout; private readonly System.Threading.Timer? _readTimer; @@ -106,7 +108,7 @@ internal IdleTimeoutTransceiverDecorator(Transceiver decoratee, ConnectionI conn if (enableIdleCheck) { - _readTimer = new System.Threading.Timer(_ => connection.idleCheck(_idleTimeout)); + _readTimer = new System.Threading.Timer(_ => connection.idleCheck(_idleTimeout, rescheduleReadTimer)); } _writeTimer = new System.Threading.Timer(_ => connection.sendHeartbeat()); diff --git a/csharp/src/Ice/Internal/StreamSocket.cs b/csharp/src/Ice/Internal/StreamSocket.cs index 12913269f60..f3455624a12 100644 --- a/csharp/src/Ice/Internal/StreamSocket.cs +++ b/csharp/src/Ice/Internal/StreamSocket.cs @@ -8,6 +8,21 @@ namespace Ice.Internal; public sealed class StreamSocket { + internal bool isWaitingToBeRead + { + get + { + try + { + return _pendingRead.IsSet || _fd.Available > 0; + } + catch + { + return false; + } + } + } + public StreamSocket(ProtocolInstance instance, NetworkProxy proxy, EndPoint addr, EndPoint sourceAddr) { _instance = instance; @@ -193,6 +208,8 @@ public void finishRead(Buffer buf) Debug.Assert(_fd != null && _readEventArgs != null); try { + _pendingRead.Reset(); + if (_readEventArgs.SocketError != SocketError.Success) { throw new System.Net.Sockets.SocketException((int)_readEventArgs.SocketError); @@ -347,6 +364,7 @@ public void destroy() Debug.Assert(_readEventArgs != null && _writeEventArgs != null); _readEventArgs.Dispose(); _writeEventArgs.Dispose(); + _pendingRead.Dispose(); } public override string ToString() @@ -443,6 +461,7 @@ private void ioCompleted(object sender, SocketAsyncEventArgs e) switch (e.LastOperation) { case SocketAsyncOperation.Receive: + _pendingRead.Set(); _readCallback(e.UserToken); break; case SocketAsyncOperation.Send: @@ -505,6 +524,8 @@ private int toState(int operation) private AsyncCallback _writeCallback; private AsyncCallback _readCallback; + private readonly ManualResetEventSlim _pendingRead = new ManualResetEventSlim(false); + private const int StateNeedConnect = 0; private const int StateConnectPending = 1; private const int StateProxyWrite = 2; diff --git a/csharp/src/Ice/Internal/TcpTransceiver.cs b/csharp/src/Ice/Internal/TcpTransceiver.cs index 41d8effc839..d54cd494d76 100644 --- a/csharp/src/Ice/Internal/TcpTransceiver.cs +++ b/csharp/src/Ice/Internal/TcpTransceiver.cs @@ -8,6 +8,8 @@ namespace Ice.Internal; internal sealed class TcpTransceiver : Transceiver { + public bool isWaitingToBeRead => _stream.isWaitingToBeRead; + public Socket fd() { return _stream.fd(); diff --git a/csharp/src/Ice/Internal/Transceiver.cs b/csharp/src/Ice/Internal/Transceiver.cs index bd98dcdc5e6..1df7c7066b8 100644 --- a/csharp/src/Ice/Internal/Transceiver.cs +++ b/csharp/src/Ice/Internal/Transceiver.cs @@ -6,6 +6,11 @@ namespace Ice.Internal; public interface Transceiver { + /// + /// Checks if this transceiver is waiting to be read, typically because it has bytes readily available for reading. + /// + bool isWaitingToBeRead { get; } + Socket fd(); int initialize(Buffer readBuffer, Buffer writeBuffer, ref bool hasMoreData); int closing(bool initiator, Ice.LocalException ex); diff --git a/csharp/src/Ice/Internal/UdpTransceiver.cs b/csharp/src/Ice/Internal/UdpTransceiver.cs index 6096eb32298..c9c7011604b 100644 --- a/csharp/src/Ice/Internal/UdpTransceiver.cs +++ b/csharp/src/Ice/Internal/UdpTransceiver.cs @@ -9,6 +9,15 @@ namespace Ice.Internal; internal sealed class UdpTransceiver : Transceiver { + public bool isWaitingToBeRead + { + get + { + Debug.Fail("UdpTransceiver does not implement isWaitingToBeRead"); + return false; + } + } + public Socket fd() { return _fd; diff --git a/csharp/src/Ice/Internal/WSTransceiver.cs b/csharp/src/Ice/Internal/WSTransceiver.cs index d9b42ec2c2d..4311184e2c0 100644 --- a/csharp/src/Ice/Internal/WSTransceiver.cs +++ b/csharp/src/Ice/Internal/WSTransceiver.cs @@ -9,6 +9,8 @@ namespace Ice.Internal; internal sealed class WSTransceiver : Transceiver { + public bool isWaitingToBeRead => _readBuffer.b.position() > _readBufferPos || _delegate.isWaitingToBeRead; + public Socket fd() { return _delegate.fd(); diff --git a/csharp/src/Ice/SSL/TransceiverI.cs b/csharp/src/Ice/SSL/TransceiverI.cs index ab11614cab9..1dcd0157f49 100644 --- a/csharp/src/Ice/SSL/TransceiverI.cs +++ b/csharp/src/Ice/SSL/TransceiverI.cs @@ -10,6 +10,8 @@ namespace Ice.SSL; internal sealed class TransceiverI : Ice.Internal.Transceiver { + public bool isWaitingToBeRead => _pendingRead.IsSet || _delegate.isWaitingToBeRead; + public Socket fd() => _delegate.fd(); public int initialize(Ice.Internal.Buffer readBuffer, Ice.Internal.Buffer writeBuffer, ref bool hasMoreData) @@ -86,7 +88,11 @@ public Ice.Internal.EndpointI bind() return null; } - public void destroy() => _delegate.destroy(); + public void destroy() + { + _delegate.destroy(); + _pendingRead.Dispose(); + } public int write(Ice.Internal.Buffer buf) => // Force caller to use async write. @@ -109,7 +115,13 @@ public bool startRead(Ice.Internal.Buffer buf, Ice.Internal.AsyncCallback callba try { _readResult = _sslStream.ReadAsync(buf.b.rawBytes(), buf.b.position(), packetSz); - _readResult.ContinueWith(task => callback(state), TaskScheduler.Default); + _readResult.ContinueWith( + task => + { + _pendingRead.Set(); + callback(state); + }, + TaskScheduler.Default); return false; } catch (IOException ex) @@ -150,6 +162,8 @@ public void finishRead(Ice.Internal.Buffer buf) Debug.Assert(_readResult != null); try { + _pendingRead.Reset(); + int ret; try { @@ -493,6 +507,7 @@ public int getRecvPacketSize(int length) => private bool _authenticated; private Task _writeResult; private Task _readResult; + private readonly ManualResetEventSlim _pendingRead = new ManualResetEventSlim(false); private int _maxSendPacketSize; private int _maxRecvPacketSize; private string _cipher; diff --git a/csharp/test/Ice/background/Transceiver.cs b/csharp/test/Ice/background/Transceiver.cs index 21fbae185bd..cf7ee7c514e 100644 --- a/csharp/test/Ice/background/Transceiver.cs +++ b/csharp/test/Ice/background/Transceiver.cs @@ -5,6 +5,9 @@ internal class Transceiver : Ice.Internal.Transceiver { + public bool isWaitingToBeRead => + (_buffered && (_readBuffer.b.position() - _readBufferPos > 0)) || _transceiver.isWaitingToBeRead; + public Socket fd() { return _transceiver.fd(); diff --git a/csharp/test/Ice/idleTimeout/AllTests.cs b/csharp/test/Ice/idleTimeout/AllTests.cs index f1eb3732cd6..f600def3f2e 100644 --- a/csharp/test/Ice/idleTimeout/AllTests.cs +++ b/csharp/test/Ice/idleTimeout/AllTests.cs @@ -14,8 +14,7 @@ internal static async Task allTests(global::Test.TestHelper helper) string proxyString3s = $"test: {helper.getTestEndpoint(1)}"; - // TODO: this test no longer works with the thread pool fix, idle timeout implementation needs fixing. - // await testIdleCheckDoesNotAbortConnectionWhenThreadPoolIsExhausted(p, helper.getWriter()); + await testIdleCheckDoesNotAbortConnectionWhenThreadPoolIsExhausted(p, helper.getWriter()); await testConnectionAbortedByIdleCheck(proxyString, communicator.getProperties(), helper.getWriter()); await testEnableDisableIdleCheck(true, proxyString3s, communicator.getProperties(), helper.getWriter()); await testEnableDisableIdleCheck(false, proxyString3s, communicator.getProperties(), helper.getWriter()); @@ -39,7 +38,10 @@ private static async Task testIdleCheckDoesNotAbortConnectionWhenThreadPoolIsExh // Establish connection. await p.ice_pingAsync(); - await p.sleepAsync(2000); // the implementation in the server sleeps for 2,000ms + await p.sleepAsync(4000); // the implementation in the server sleeps for 2,000ms + + // close connection + p.ice_getConnection().close(ConnectionClose.GracefullyWithWait); output.WriteLine("ok"); } diff --git a/csharp/test/Ice/idleTimeout/Client.cs b/csharp/test/Ice/idleTimeout/Client.cs index 803319784e4..73d7605d003 100644 --- a/csharp/test/Ice/idleTimeout/Client.cs +++ b/csharp/test/Ice/idleTimeout/Client.cs @@ -8,6 +8,9 @@ public override async Task runAsync(string[] args) { var properties = createTestProperties(ref args); properties.setProperty("Ice.Connection.IdleTimeout", "1"); + + // TODO: temporary work-around for IceSSL always sending messages asynchronously. + properties.setProperty("Ice.Connection.EnableIdleCheck", "0"); using var communicator = initialize(properties); await AllTests.allTests(this); } diff --git a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Transceiver.java b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Transceiver.java index 1f2b661f14f..14a01ea9439 100644 --- a/java/src/Ice/src/main/java/com/zeroc/IceInternal/Transceiver.java +++ b/java/src/Ice/src/main/java/com/zeroc/IceInternal/Transceiver.java @@ -39,8 +39,7 @@ public interface Transceiver { /** * Checks if this transceiver is waiting to be read, typically because it has bytes readily - * available for reading. The caller must ensure the transceiver is not closed when calling this - * method. + * available for reading. * * @return true if this transceiver is waiting to be read, false otherwise. */