diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs index fded3c6f303..9fb4c159f0e 100644 --- a/csharp/src/Ice/ConnectionI.cs +++ b/csharp/src/Ice/ConnectionI.cs @@ -1376,6 +1376,7 @@ internal ConnectionI( _closeTimeout = options.closeTimeout; // not used for datagram connections // suppress inactivity timeout for datagram connections _inactivityTimeout = endpoint.datagram() ? TimeSpan.Zero : options.inactivityTimeout; + _maxDispatches = options.maxDispatches; _removeFromFactory = removeFromFactory; _warn = initData.properties.getIcePropertyAsInt("Ice.Warn.Connections") > 0; _warnUdp = initData.properties.getIcePropertyAsInt("Ice.Warn.Datagrams") > 0; @@ -1647,13 +1648,19 @@ private void setState(int state) case StateActive: { // - // Can only switch from holding or not validated to - // active. + // Can only switch to active from holding or not validated. // if (_state != StateHolding && _state != StateNotValidated) { return; } + + if (_maxDispatches > 0 && _dispatchCount >= _maxDispatches) + { + // Don't resume reads if the max dispatch count is reached. + return; + } + _threadPool.register(this, SocketOperation.Read); break; } @@ -1661,17 +1668,20 @@ private void setState(int state) case StateHolding: { // - // Can only switch from active or not validated to - // holding. + // Can only switch to holding from active or not validated. // if (_state != StateActive && _state != StateNotValidated) { return; } - if (_state == StateActive) + + if (_maxDispatches > 0 && _dispatchCount >= _maxDispatches) { - _threadPool.unregister(this, SocketOperation.Read); + // Reads are already disabled if the max dispatch count is reached. + return; } + + _threadPool.unregister(this, SocketOperation.Read); break; } @@ -2409,7 +2419,15 @@ private int parseMessage(ref MessageInfo info) } } - return _state == StateHolding ? SocketOperation.None : SocketOperation.Read; + if (_state == StateHolding || (_maxDispatches > 0 && _dispatchCount >= _maxDispatches)) + { + // Don't re-enable reads if the connection is in the holding state or if the max dispatch count is reached. + return SocketOperation.None; + } + else + { + return SocketOperation.Read; + } } private void dispatchAll( @@ -2515,6 +2533,13 @@ private void sendResponse(OutgoingResponse response, bool isTwoWay, byte compres sendMessage(new OutgoingMessage(response.outputStream, compress > 0, adopt: true)); } + if (_state == StateActive && _maxDispatches > 0 && _dispatchCount == _maxDispatches) + { + // Resume reads if the connection is active and the dispatch count is (about to be) bellow the + // max dispatch count. + _threadPool.update(this, SocketOperation.None, SocketOperation.Read); + } + --_dispatchCount; if (_state == StateClosing && _upcallCount == 0) @@ -2895,6 +2920,7 @@ internal void completed(LocalException ex) // The number of outstanding dispatches. Maintained only while state is StateActive or StateHolding. private int _dispatchCount; + private readonly int _maxDispatches; private int _state; // The current state. private bool _shutdownInitiated; diff --git a/csharp/src/Ice/ConnectionOptions.cs b/csharp/src/Ice/ConnectionOptions.cs index 4519a6b8676..dc0dfde0c4a 100644 --- a/csharp/src/Ice/ConnectionOptions.cs +++ b/csharp/src/Ice/ConnectionOptions.cs @@ -9,4 +9,5 @@ internal sealed record class ConnectionOptions( TimeSpan closeTimeout, TimeSpan idleTimeout, bool enableIdleCheck, - TimeSpan inactivityTimeout); + TimeSpan inactivityTimeout, + int maxDispatches); diff --git a/csharp/src/Ice/Internal/ConnectionFactory.cs b/csharp/src/Ice/Internal/ConnectionFactory.cs index e3789828b85..f11559bd8b3 100644 --- a/csharp/src/Ice/Internal/ConnectionFactory.cs +++ b/csharp/src/Ice/Internal/ConnectionFactory.cs @@ -1419,6 +1419,20 @@ public override void message(ThreadPoolCurrent current) } _connections.Add(connection); + if (_maxConnections > 0 && _connections.Count == _maxConnections) + { + if (_instance.traceLevels().network >= 1) + { + StringBuilder s = new StringBuilder("holding "); + s.Append(_endpoint.protocol()); + s.Append(" connections at "); + s.Append(_acceptor.ToString()); + s.Append(" because the maximum number of connections is reached"); + _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, + s.ToString()); + } + _adapter.getThreadPool().unregister(this, SocketOperation.Read); + } } finally { @@ -1504,6 +1518,9 @@ public IncomingConnectionFactory(Instance instance, EndpointI endpoint, Endpoint _state = StateHolding; _acceptorStarted = false; + // TODO: add MaxConnections property. + _maxConnections = 0; + DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); if (defaultsAndOverrides.overrideCompress is not null) @@ -1595,6 +1612,13 @@ private void setState(int state) { return; } + + if (_maxConnections > 0 && _connections.Count == _maxConnections) + { + // Don't resume accepting new connections if the max connection count is reached. + return; + } + if (_acceptor != null) { if (_instance.traceLevels().network >= 1) @@ -1622,6 +1646,12 @@ private void setState(int state) { return; } + + if (_maxConnections > 0 && _connections.Count == _maxConnections) + { + return; + } + if (_acceptor != null) { if (_instance.traceLevels().network >= 1) @@ -1739,10 +1769,26 @@ private void closeAcceptor() private void removeConnection(ConnectionI connection) { + Debug.Assert(_acceptor != null); + lock (this) { if (_state is StateActive or StateHolding) { + if (_state is StateActive && _maxConnections > 0 && _connections.Count == _maxConnections) + { + if (_instance.traceLevels().network >= 1) + { + StringBuilder s = new StringBuilder("accepting "); + s.Append(_endpoint.protocol()); + s.Append(" connections at "); + s.Append(_acceptor.ToString()); + _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, + s.ToString()); + } + // Resume accepting new connections. + _adapter.getThreadPool().register(this, SocketOperation.Read); + } _connections.Remove(connection); } // else it's already being cleaned up. @@ -1770,5 +1816,6 @@ private void warning(Ice.LocalException ex) private int _state; private bool _acceptorStarted; + private int _maxConnections; private Ice.LocalException _acceptorException; } diff --git a/csharp/src/Ice/Internal/Instance.cs b/csharp/src/Ice/Internal/Instance.cs index ac38e058f68..0a2a63e7690 100644 --- a/csharp/src/Ice/Internal/Instance.cs +++ b/csharp/src/Ice/Internal/Instance.cs @@ -706,11 +706,10 @@ internal void initialize(Ice.Communicator communicator, Ice.InitializationData i closeTimeout: TimeSpan.FromSeconds(properties.getIcePropertyAsInt("Ice.Connection.CloseTimeout")), idleTimeout: TimeSpan.FromSeconds(properties.getIcePropertyAsInt("Ice.Connection.IdleTimeout")), enableIdleCheck: properties.getIcePropertyAsInt("Ice.Connection.EnableIdleCheck") > 0, - inactivityTimeout: TimeSpan.FromSeconds(properties.getIcePropertyAsInt("Ice.Connection.InactivityTimeout"))); - + inactivityTimeout: TimeSpan.FromSeconds(properties.getIcePropertyAsInt("Ice.Connection.InactivityTimeout")), + maxDispatches: properties.getIcePropertyAsInt("Ice.Connection.MaxDispatches")); { - int num = - _initData.properties.getIcePropertyAsInt("Ice.MessageSizeMax"); + int num = _initData.properties.getIcePropertyAsInt("Ice.MessageSizeMax"); if (num < 1 || num > 0x7fffffff / 1024) { _messageSizeMax = 0x7fffffff; @@ -1436,7 +1435,11 @@ internal ConnectionOptions serverConnectionOptions(string adapterName) inactivityTimeout: TimeSpan.FromSeconds(properties.getPropertyAsIntWithDefault( $"{adapterName}.Connection.InactivityTimeout", - (int)clientConnectionOptions.inactivityTimeout.TotalSeconds))); + (int)clientConnectionOptions.inactivityTimeout.TotalSeconds)), + + maxDispatches: properties.getPropertyAsIntWithDefault( + $"{adapterName}.Connection.MaxDispatches", + (int)clientConnectionOptions.maxDispatches)); } else { diff --git a/csharp/src/Ice/ObjectAdapter.cs b/csharp/src/Ice/ObjectAdapter.cs index c263c14836b..37f13fde342 100644 --- a/csharp/src/Ice/ObjectAdapter.cs +++ b/csharp/src/Ice/ObjectAdapter.cs @@ -1646,6 +1646,7 @@ private Object createDispatchPipeline() "Connection.EnableIdleCheck", "Connection.IdleTimeout", "Connection.InactivityTimeout", + "Connection.MaxDispatches", "Endpoints", "Locator", "Locator.EncodingVersion", diff --git a/csharp/test/Ice/idleTimeout/AllTests.cs b/csharp/test/Ice/idleTimeout/AllTests.cs index f600def3f2e..cfe8c7f6c35 100644 --- a/csharp/test/Ice/idleTimeout/AllTests.cs +++ b/csharp/test/Ice/idleTimeout/AllTests.cs @@ -14,10 +14,14 @@ internal static async Task allTests(global::Test.TestHelper helper) string proxyString3s = $"test: {helper.getTestEndpoint(1)}"; + string proxyStringMaxDispatches = $"test: {helper.getTestEndpoint(2)}"; + Test.TestIntfPrx pMaxDispatches = Test.TestIntfPrxHelper.createProxy(communicator, proxyStringMaxDispatches); + await testIdleCheckDoesNotAbortConnectionWhenThreadPoolIsExhausted(p, helper.getWriter()); await testConnectionAbortedByIdleCheck(proxyString, communicator.getProperties(), helper.getWriter()); await testEnableDisableIdleCheck(true, proxyString3s, communicator.getProperties(), helper.getWriter()); await testEnableDisableIdleCheck(false, proxyString3s, communicator.getProperties(), helper.getWriter()); + await testMaxDispatches(pMaxDispatches, helper.getWriter()); await p.shutdownAsync(); } @@ -115,4 +119,19 @@ private static async Task testEnableDisableIdleCheck( } output.WriteLine("ok"); } + + private static async Task testMaxDispatches(Test.TestIntfPrx p, TextWriter output) + { + output.Write("testing max dispatches... "); + output.Flush(); + + var sleepTask = p.sleepAsync(2000); + var sleepTask2 = p.sleepAsync(1000); + await Task.Delay(1500); + test(sleepTask2.IsCompleted == false); + await sleepTask; + await sleepTask2; + + output.WriteLine("ok"); + } } diff --git a/csharp/test/Ice/idleTimeout/Server.cs b/csharp/test/Ice/idleTimeout/Server.cs index d25ea15fa5e..f1f2cd90cde 100644 --- a/csharp/test/Ice/idleTimeout/Server.cs +++ b/csharp/test/Ice/idleTimeout/Server.cs @@ -24,6 +24,13 @@ public override void run(string[] args) adapter3s.add(new TestIntfI(), Ice.Util.stringToIdentity("test")); adapter3s.activate(); + communicator.getProperties().setProperty("TestAdapterMaxDispatches.Endpoints", getTestEndpoint(2)); + communicator.getProperties().setProperty("TestAdapterMaxDispatches.ThreadPool.Size", "10"); + communicator.getProperties().setProperty("TestAdapterMaxDispatches.Connection.MaxDispatches", "1"); + var adapterMaxDispatches = communicator.createObjectAdapter("TestAdapterMaxDispatches"); + adapterMaxDispatches.add(new TestIntfI(), Ice.Util.stringToIdentity("test")); + adapterMaxDispatches.activate(); + serverReady(); communicator.waitForShutdown(); }