Skip to content

Commit

Permalink
Implement JavaScript inactivity timeout (zeroc-ice#2333)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone authored Jun 21, 2024
1 parent 1982ee7 commit 3c77df8
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 3 deletions.
1 change: 1 addition & 0 deletions js/gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ const tests = [
"test/Ice/facets",
"test/Ice/hold",
"test/Ice/idleTimeout",
"test/Ice/inactivityTimeout",
"test/Ice/info",
"test/Ice/inheritance",
"test/Ice/location",
Expand Down
2 changes: 1 addition & 1 deletion js/src/Ice/ConnectRequestHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ export class ConnectRequestHandler {
this._reference.getInstance().requestHandlerFactory().removeRequestHandler(this._reference, this);
request.retryException(ex.inner);
} else {
Debug.assert(ex instanceof LocalException);
Debug.assert(ex instanceof LocalException, ex);
exception = ex;
request.out.completedEx(ex);
}
Expand Down
96 changes: 94 additions & 2 deletions js/src/Ice/ConnectionI.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
CommunicatorDestroyedException,
CloseConnectionException,
ConnectionManuallyClosedException,
ConnectionClosedException,
ConnectTimeoutException,
ConnectionIdleException,
ConnectionLostException,
Expand Down Expand Up @@ -81,6 +82,8 @@ export class ConnectionI {
this._closeTimeoutId = undefined;

this._inactivityTimeout = options.inactivityTimeout;
this._inactivityTimer = undefined;

const initData = instance.initializationData();
this._logger = initData.logger; // Cached for better performance.
this._traceLevels = instance.traceLevels(); // Cached for better performance.
Expand All @@ -102,8 +105,13 @@ export class ConnectionI {
this._readStreamPos = -1;
this._writeStreamPos = -1;

// The number of user calls currently executed by the event-loop (servant dispatch, invocation response, etc.).
this._upcallCount = 0;

// The number of outstanding dispatches. This does not include heartbeat messages, even when the heartbeat
// callback is not null. Maintained only while state is StateActive or StateHolding.
this._dispatchCount = 0;

this._state = StateNotInitialized;
this._shutdownInitiated = false;
this._initialized = false;
Expand Down Expand Up @@ -431,6 +439,8 @@ export class ConnectionI {

this.sendMessage(OutgoingMessage.createForStream(os, true));

--this._dispatchCount;

if (this._state === StateClosing && this._upcallCount === 0) {
this.initiateShutdown();
}
Expand Down Expand Up @@ -458,6 +468,8 @@ export class ConnectionI {
throw this._exception;
}

--this._dispatchCount;

if (this._state === StateClosing && this._upcallCount === 0) {
this.initiateShutdown();
}
Expand Down Expand Up @@ -751,6 +763,7 @@ export class ConnectionI {
!(
this._exception instanceof CloseConnectionException ||
this._exception instanceof ConnectionManuallyClosedException ||
this._exception instanceof ConnectionClosedException ||
this._exception instanceof ConnectionIdleException ||
this._exception instanceof CommunicatorDestroyedException ||
this._exception instanceof ObjectAdapterDeactivatedException
Expand Down Expand Up @@ -880,9 +893,27 @@ export class ConnectionI {
}
}

inactivityCheck(inactivityTimer) {
// If the timers are different, it means this inactivityTimer is no longer current.
if (inactivityTimer == this._inactivityTimer) {
this._inactivityTimer = undefined;
inactivityTimer.destroy();

if (this._state == StateActive) {
this.setState(
StateClosing,
new ConnectionClosedException(
"connection closed because it remained inactive for longer than the inactivity timeout",
),
);
}
}
// Else this timer was already canceled and disposed. Nothing to do.
}

setState(state, ex) {
if (ex !== undefined) {
Debug.assert(ex instanceof LocalException);
Debug.assert(ex instanceof LocalException, ex);

//
// If setState() is called with an exception, then only closed
Expand All @@ -909,6 +940,7 @@ export class ConnectionI {
!(
this._exception instanceof CloseConnectionException ||
this._exception instanceof ConnectionManuallyClosedException ||
this._exception instanceof ConnectionClosedException ||
this._exception instanceof ConnectionIdleException ||
this._exception instanceof CommunicatorDestroyedException ||
this._exception instanceof ObjectAdapterDeactivatedException ||
Expand Down Expand Up @@ -939,6 +971,11 @@ export class ConnectionI {
return;
}

if (state > StateActive) {
// Cancel the inactivity timer, if not null.
this.cancelInactivityTimer();
}

try {
switch (state) {
case StateNotInitialized: {
Expand Down Expand Up @@ -1277,12 +1314,45 @@ export class ConnectionI {
}

sendMessage(message) {
Debug.assert(this._state >= StateActive);
Debug.assert(this._state < StateClosed);

const isHeartbeat = message.stream.buffer.getAt(8) == Protocol.validateConnectionMsg;
if (!isHeartbeat) {
this.cancelInactivityTimer();
}
// If we're sending a heartbeat, there is a chance the connection is inactive and that we need to schedule the
// inactivity timer. It's ok to do this before actually sending the heartbeat since the heartbeat does not count
// as an "activity".
else if (
this._inactivityTimer === undefined && // timer not already scheduled
this._inactivityTimeout > 0 && // inactivity timeout is enabled
this._state == StateActive && // only schedule the timer if the connection is active
this._dispatchCount == 0 && // no pending dispatch
this._asyncRequests.size == 0 && // no pending invocation
this._readHeader // we're not waiting for the remainder of an incoming message
) {
let isInactive = true;

// We may become inactive while the peer is back-pressuring us. In this case, we only schedule the
// inactivity timer if all outgoing messages in _sendStreams are heartbeats.
for (const queuedMessage of this._sendStreams) {
if (queuedMessage.stream.buffer.getAt(8) != Protocol.validateConnectionMsg) {
isInactive = false;
break; // for
}
}

if (isInactive) {
this.scheduleInactivityTimer();
}
}

if (this._sendStreams.length > 0) {
message.doAdopt();
this._sendStreams.push(message);
return AsyncStatus.Queued;
}
Debug.assert(this._state < StateClosed);

Debug.assert(!message.prepared);

Expand Down Expand Up @@ -1357,6 +1427,9 @@ export class ConnectionI {
info.servantManager = this._servantManager;
info.adapter = this._adapter;
++this._upcallCount;

this.cancelInactivityTimer();
++this._dispatchCount;
}
break;
}
Expand All @@ -1379,6 +1452,9 @@ export class ConnectionI {
info.servantManager = this._servantManager;
info.adapter = this._adapter;
this._upcallCount += info.invokeNum;

this.cancelInactivityTimer();
++this._dispatchCount;
}
break;
}
Expand Down Expand Up @@ -1538,6 +1614,22 @@ export class ConnectionI {
}
return ret;
}

scheduleInactivityTimer() {
Debug.assert(this._inactivityTimer === undefined);
Debug.assert(this._inactivityTimeout > 0);

this._inactivityTimer = new Timer();
const inactivityTimer = this._inactivityTimer;
this._inactivityTimer.schedule(() => this.inactivityCheck(inactivityTimer), this._inactivityTimeout);
}

cancelInactivityTimer() {
if (this._inactivityTimer !== undefined) {
this._inactivityTimer.destroy();
this._inactivityTimer = undefined;
}
}
}

// DestructionReason.
Expand Down
5 changes: 5 additions & 0 deletions js/src/Ice/LocalException.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,11 @@ declare module "ice" {
*/
class CloseTimeoutException extends TimeoutException {}

/**
* This exception indicates that a connection was closed gracefully.
*/
class ConnectionClosedException extends LocalException {}

/**
* This exception indicates that a connection was aborted by the idle check.
*/
Expand Down
13 changes: 13 additions & 0 deletions js/src/Ice/LocalException.js
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,19 @@ export class CloseTimeoutException extends TimeoutException {
}
}

/**
* This exception indicates that a connection was closed gracefully.
**/
export class ConnectionClosedException extends LocalException {
constructor(_cause = "") {
super(_cause);
}

static get _id() {
return "::Ice::ConnectionClosedException";
}
}

/**
* This exception indicates that a connection was aborted by the idle check.
**/
Expand Down
89 changes: 89 additions & 0 deletions js/test/Ice/inactivityTimeout/Client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//

import { Ice } from "ice";
import { Test } from "./Test.js";
import { TestHelper } from "../../Common/TestHelper.js";

const test = TestHelper.test;

async function testClientInactivityTimeout(p: Test.TestIntfPrx, helper: TestHelper) {
const output = helper.getWriter();
output.write("testing that the client side inactivity timeout shuts down the connection... ");

await p.ice_ping();
const connection = await p.ice_getConnection();
test(connection !== null);

// The inactivity timeout is 3s on the client side and 5s on the server side. 4 seconds tests the client side.
await Ice.Promise.delay(4000);
await p.ice_ping();
const connection2 = await p.ice_getConnection();
test(connection2 != connection);
output.writeLine("ok");
}

async function testWithOutstandingRequest(p: Test.TestIntfPrx, oneway: boolean, helper: TestHelper) {
const output = helper.getWriter();
const onewayString = oneway ? "one-way" : "two-way";
output.write(`testing the inactivity timeout with an outstanding ${onewayString} request... `);

if (oneway) {
p = p.ice_oneway();
}

await p.ice_ping();
const connection = await p.ice_getConnection();
test(connection !== null);

// The inactivity timeout is 3s on the client side and 5s on the server side; 4 seconds tests only the
// client-side.
await p.sleep(4000); // two-way blocks for 4 seconds; one-way is non-blocking
if (oneway) {
await Ice.Promise.delay(4000);
}
await p.ice_ping();
const connection2 = await p.ice_getConnection();

if (oneway) {
// With a oneway invocation, the inactivity timeout on the client side shut down the first connection.
test(connection2 != connection);
} else {
// With a two-way invocation, the inactivity timeout should not shutdown any connection.
test(connection2 == connection);
}
output.writeLine("ok");
}

export class Client extends TestHelper {
async allTests(): Promise<void> {
const communicator = this.communicator();

const proxyString = `test: ${this.getTestEndpoint()}`;
const p = Test.TestIntfPrx.uncheckedCast(communicator.stringToProxy(proxyString));

await testClientInactivityTimeout(p, this);
await testWithOutstandingRequest(p, false, this);
await testWithOutstandingRequest(p, true, this);

await p.shutdown();
}

async run(args: string[]) {
let communicator: Ice.Communicator | null = null;
try {
const [properties] = this.createTestProperties(args);
// We configure a low idle timeout to make sure we send heartbeats frequently. It's the sending of the
// heartbeats that schedules the inactivity timer task.
properties.setProperty("Ice.Connection.IdleTimeout", "1");
properties.setProperty("Ice.Connection.InactivityTimeout", "3");
[communicator] = this.initialize(properties);
await this.allTests();
} finally {
if (communicator) {
await communicator.destroy();
}
}
}
}
14 changes: 14 additions & 0 deletions js/test/Ice/inactivityTimeout/Test.ice
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//

#pragma once

module Test
{
interface TestIntf
{
void sleep(int ms);
void shutdown();
}
}

0 comments on commit 3c77df8

Please sign in to comment.