Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proof of concept for MaxDispatches implementation #2598

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions csharp/src/Ice/ConnectionI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,7 @@ internal ConnectionI(
_closeTimeout = options.closeTimeout; // not used for datagram connections
// suppress inactivity timeout for datagram connections
_inactivityTimeout = endpoint.datagram() ? TimeSpan.Zero : options.inactivityTimeout;
_maxDispatches = options.maxDispatches;
_removeFromFactory = removeFromFactory;
_warn = initData.properties.getIcePropertyAsInt("Ice.Warn.Connections") > 0;
_warnUdp = initData.properties.getIcePropertyAsInt("Ice.Warn.Datagrams") > 0;
Expand Down Expand Up @@ -1647,31 +1648,40 @@ private void setState(int state)
case StateActive:
{
//
// Can only switch from holding or not validated to
// active.
// Can only switch to active from holding or not validated.
//
if (_state != StateHolding && _state != StateNotValidated)
{
return;
}

if (_maxDispatches > 0 && _dispatchCount >= _maxDispatches)
{
// Don't resume reads if the max dispatch count is reached.
return;
}

_threadPool.register(this, SocketOperation.Read);
break;
}

case StateHolding:
{
//
// Can only switch from active or not validated to
// holding.
// Can only switch to holding from active or not validated.
bentoi marked this conversation as resolved.
Show resolved Hide resolved
//
if (_state != StateActive && _state != StateNotValidated)
{
return;
}
if (_state == StateActive)

if (_maxDispatches > 0 && _dispatchCount >= _maxDispatches)
{
_threadPool.unregister(this, SocketOperation.Read);
// Reads are already disabled if the max dispatch count is reached.
return;
}

_threadPool.unregister(this, SocketOperation.Read);
bentoi marked this conversation as resolved.
Show resolved Hide resolved
break;
}

Expand Down Expand Up @@ -2409,7 +2419,15 @@ private int parseMessage(ref MessageInfo info)
}
}

return _state == StateHolding ? SocketOperation.None : SocketOperation.Read;
if (_state == StateHolding || (_maxDispatches > 0 && _dispatchCount >= _maxDispatches))
{
// Don't re-enable reads if the connection is in the holding state or if the max dispatch count is reached.
return SocketOperation.None;
}
else
{
return SocketOperation.Read;
}
}

private void dispatchAll(
Expand Down Expand Up @@ -2515,6 +2533,13 @@ private void sendResponse(OutgoingResponse response, bool isTwoWay, byte compres
sendMessage(new OutgoingMessage(response.outputStream, compress > 0, adopt: true));
}

if (_state == StateActive && _maxDispatches > 0 && _dispatchCount == _maxDispatches)
{
// Resume reads if the connection is active and the dispatch count is (about to be) bellow the
bentoi marked this conversation as resolved.
Show resolved Hide resolved
// max dispatch count.
bentoi marked this conversation as resolved.
Show resolved Hide resolved
_threadPool.update(this, SocketOperation.None, SocketOperation.Read);
}

--_dispatchCount;

if (_state == StateClosing && _upcallCount == 0)
Expand Down Expand Up @@ -2895,6 +2920,7 @@ internal void completed(LocalException ex)

// The number of outstanding dispatches. Maintained only while state is StateActive or StateHolding.
private int _dispatchCount;
private readonly int _maxDispatches;

private int _state; // The current state.
private bool _shutdownInitiated;
Expand Down
3 changes: 2 additions & 1 deletion csharp/src/Ice/ConnectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ internal sealed record class ConnectionOptions(
TimeSpan closeTimeout,
TimeSpan idleTimeout,
bool enableIdleCheck,
TimeSpan inactivityTimeout);
TimeSpan inactivityTimeout,
int maxDispatches);
47 changes: 47 additions & 0 deletions csharp/src/Ice/Internal/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,20 @@ public override void message(ThreadPoolCurrent current)
}

_connections.Add(connection);
if (_maxConnections > 0 && _connections.Count == _maxConnections)
{
if (_instance.traceLevels().network >= 1)
{
StringBuilder s = new StringBuilder("holding ");
bentoi marked this conversation as resolved.
Show resolved Hide resolved
s.Append(_endpoint.protocol());
s.Append(" connections at ");
s.Append(_acceptor.ToString());
s.Append(" because the maximum number of connections is reached");
_instance.initializationData().logger.trace(_instance.traceLevels().networkCat,
s.ToString());
}
_adapter.getThreadPool().unregister(this, SocketOperation.Read);
}
}
finally
{
Expand Down Expand Up @@ -1504,6 +1518,9 @@ public IncomingConnectionFactory(Instance instance, EndpointI endpoint, Endpoint
_state = StateHolding;
_acceptorStarted = false;

// TODO: add MaxConnections property.
bentoi marked this conversation as resolved.
Show resolved Hide resolved
_maxConnections = 0;

DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();

if (defaultsAndOverrides.overrideCompress is not null)
Expand Down Expand Up @@ -1595,6 +1612,13 @@ private void setState(int state)
{
return;
}

if (_maxConnections > 0 && _connections.Count == _maxConnections)
bentoi marked this conversation as resolved.
Show resolved Hide resolved
{
// Don't resume accepting new connections if the max connection count is reached.
return;
}

if (_acceptor != null)
{
if (_instance.traceLevels().network >= 1)
Expand Down Expand Up @@ -1622,6 +1646,12 @@ private void setState(int state)
{
return;
}

if (_maxConnections > 0 && _connections.Count == _maxConnections)
{
return;
}

if (_acceptor != null)
{
if (_instance.traceLevels().network >= 1)
Expand Down Expand Up @@ -1739,10 +1769,26 @@ private void closeAcceptor()

private void removeConnection(ConnectionI connection)
{
Debug.Assert(_acceptor != null);

lock (this)
{
if (_state is StateActive or StateHolding)
{
if (_state is StateActive && _maxConnections > 0 && _connections.Count == _maxConnections)
{
if (_instance.traceLevels().network >= 1)
{
StringBuilder s = new StringBuilder("accepting ");
s.Append(_endpoint.protocol());
s.Append(" connections at ");
s.Append(_acceptor.ToString());
_instance.initializationData().logger.trace(_instance.traceLevels().networkCat,
s.ToString());
}
// Resume accepting new connections.
_adapter.getThreadPool().register(this, SocketOperation.Read);
}
_connections.Remove(connection);
}
// else it's already being cleaned up.
Expand Down Expand Up @@ -1770,5 +1816,6 @@ private void warning(Ice.LocalException ex)

private int _state;
private bool _acceptorStarted;
private int _maxConnections;
private Ice.LocalException _acceptorException;
}
13 changes: 8 additions & 5 deletions csharp/src/Ice/Internal/Instance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -706,11 +706,10 @@ internal void initialize(Ice.Communicator communicator, Ice.InitializationData i
closeTimeout: TimeSpan.FromSeconds(properties.getIcePropertyAsInt("Ice.Connection.CloseTimeout")),
idleTimeout: TimeSpan.FromSeconds(properties.getIcePropertyAsInt("Ice.Connection.IdleTimeout")),
enableIdleCheck: properties.getIcePropertyAsInt("Ice.Connection.EnableIdleCheck") > 0,
inactivityTimeout: TimeSpan.FromSeconds(properties.getIcePropertyAsInt("Ice.Connection.InactivityTimeout")));

inactivityTimeout: TimeSpan.FromSeconds(properties.getIcePropertyAsInt("Ice.Connection.InactivityTimeout")),
maxDispatches: properties.getIcePropertyAsInt("Ice.Connection.MaxDispatches"));
{
int num =
_initData.properties.getIcePropertyAsInt("Ice.MessageSizeMax");
int num = _initData.properties.getIcePropertyAsInt("Ice.MessageSizeMax");
if (num < 1 || num > 0x7fffffff / 1024)
{
_messageSizeMax = 0x7fffffff;
Expand Down Expand Up @@ -1436,7 +1435,11 @@ internal ConnectionOptions serverConnectionOptions(string adapterName)

inactivityTimeout: TimeSpan.FromSeconds(properties.getPropertyAsIntWithDefault(
$"{adapterName}.Connection.InactivityTimeout",
(int)clientConnectionOptions.inactivityTimeout.TotalSeconds)));
(int)clientConnectionOptions.inactivityTimeout.TotalSeconds)),

maxDispatches: properties.getPropertyAsIntWithDefault(
$"{adapterName}.Connection.MaxDispatches",
(int)clientConnectionOptions.maxDispatches));
bentoi marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
Expand Down
1 change: 1 addition & 0 deletions csharp/src/Ice/ObjectAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,7 @@ private Object createDispatchPipeline()
"Connection.EnableIdleCheck",
"Connection.IdleTimeout",
"Connection.InactivityTimeout",
"Connection.MaxDispatches",
"Endpoints",
"Locator",
"Locator.EncodingVersion",
Expand Down
19 changes: 19 additions & 0 deletions csharp/test/Ice/idleTimeout/AllTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ internal static async Task allTests(global::Test.TestHelper helper)

string proxyString3s = $"test: {helper.getTestEndpoint(1)}";

string proxyStringMaxDispatches = $"test: {helper.getTestEndpoint(2)}";
Test.TestIntfPrx pMaxDispatches = Test.TestIntfPrxHelper.createProxy(communicator, proxyStringMaxDispatches);

await testIdleCheckDoesNotAbortConnectionWhenThreadPoolIsExhausted(p, helper.getWriter());
await testConnectionAbortedByIdleCheck(proxyString, communicator.getProperties(), helper.getWriter());
await testEnableDisableIdleCheck(true, proxyString3s, communicator.getProperties(), helper.getWriter());
await testEnableDisableIdleCheck(false, proxyString3s, communicator.getProperties(), helper.getWriter());
await testMaxDispatches(pMaxDispatches, helper.getWriter());

await p.shutdownAsync();
}
Expand Down Expand Up @@ -115,4 +119,19 @@ private static async Task testEnableDisableIdleCheck(
}
output.WriteLine("ok");
}

private static async Task testMaxDispatches(Test.TestIntfPrx p, TextWriter output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should naturally test 'max dispatches' in its own test fixture, not in the idle timeout test fixture.

{
output.Write("testing max dispatches... ");
output.Flush();

var sleepTask = p.sleepAsync(2000);
var sleepTask2 = p.sleepAsync(1000);
await Task.Delay(1500);
test(sleepTask2.IsCompleted == false);
await sleepTask;
await sleepTask2;

output.WriteLine("ok");
}
}
7 changes: 7 additions & 0 deletions csharp/test/Ice/idleTimeout/Server.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ public override void run(string[] args)
adapter3s.add(new TestIntfI(), Ice.Util.stringToIdentity("test"));
adapter3s.activate();

communicator.getProperties().setProperty("TestAdapterMaxDispatches.Endpoints", getTestEndpoint(2));
communicator.getProperties().setProperty("TestAdapterMaxDispatches.ThreadPool.Size", "10");
communicator.getProperties().setProperty("TestAdapterMaxDispatches.Connection.MaxDispatches", "1");
var adapterMaxDispatches = communicator.createObjectAdapter("TestAdapterMaxDispatches");
adapterMaxDispatches.add(new TestIntfI(), Ice.Util.stringToIdentity("test"));
adapterMaxDispatches.activate();

serverReady();
communicator.waitForShutdown();
}
Expand Down
Loading