From eda09af18992a0264c15c6a33a50447ab606fddc Mon Sep 17 00:00:00 2001 From: Jose Date: Mon, 18 Nov 2024 20:10:35 +0100 Subject: [PATCH] Fix JavaScript connection scheduling of heartbeats --- js/src/Ice/ConnectionI.js | 58 +++++++++++++------ js/src/Ice/IdleTimeoutTransceiverDecorator.js | 4 +- js/src/Ice/TcpEndpointI.js | 4 +- js/src/Ice/TcpTransceiver.js | 10 +--- js/src/Ice/WSTransceiver.js | 10 +--- 5 files changed, 49 insertions(+), 37 deletions(-) diff --git a/js/src/Ice/ConnectionI.js b/js/src/Ice/ConnectionI.js index 1496cc5d6fd..597b9fbc54a 100644 --- a/js/src/Ice/ConnectionI.js +++ b/js/src/Ice/ConnectionI.js @@ -415,14 +415,26 @@ export class ConnectionI { let info = null; let response = null; try { - if ((operation & SocketOperation.Write) !== 0 && this._writeStream.buffer.remaining > 0) { - DEV: console.assert(this._sendStreams.length > 0); - const currentMessage = this._sendStreams[0]; - if (!this.write(this._writeStream.buffer, () => (currentMessage.isSent = true))) { - DEV: console.assert(!this._writeStream.isEmpty()); - return; + if ((operation & SocketOperation.Write) !== 0) { + if (this._writeStream.buffer.remaining > 0) { + DEV: console.assert(this._sendStreams.length > 0); + const completedSynchronously = this.write(this._writeStream.buffer); + if (this._writeStream.buffer.remaining === 0) { + // If the write call consumed the complete buffer, we assume the message is sent now for + // at-most-once semantics. + this._sendStreams[0].isSent = true; + } + + if (!completedSynchronously) { + DEV: console.assert(!this._writeStream.isEmpty()); + return; + } + DEV: console.assert(this._writeStream.buffer.remaining === 0); + } else if (this._transceiver instanceof IdleTimeoutTransceiverDecorator) { + // The writing of the current message completed, schedule a heartbeat in case the connection has + // nothing more to write. + this._transceiver.scheduleHeartbeat(); } - DEV: console.assert(this._writeStream.buffer.remaining === 0); } if ((operation & SocketOperation.Read) !== 0 && !this._readStream.isEmpty()) { if (this._readHeader) { @@ -1156,13 +1168,18 @@ export class ConnectionI { this._writeStream.swap(message.stream); // Send the message. - const currentMessage = message; - if ( - this._writeStream.pos != this._writeStream.size && - !this.write(this._writeStream.buffer, () => (currentMessage.isSent = true)) - ) { - DEV: console.assert(!this._writeStream.isEmpty()); - return response; // not done + if (this._writeStream.pos != this._writeStream.size) { + const completedSynchronously = this.write(this._writeStream.buffer); + if (this._writeStream.buffer.remaining === 0) { + // If the write call consumed the complete buffer, we assume the message is sent now for + // at-most-once semantics. + message.isSent = true; + } + + if (!completedSynchronously) { + DEV: console.assert(!this._writeStream.isEmpty()); + return response; // not done + } } // The message was sent synchronously, notify the message, remove it from the sent queue and keep going. @@ -1204,7 +1221,14 @@ export class ConnectionI { TraceUtil.traceSend(stream, this._instance, this, this._logger, this._traceLevels); - if (this.write(stream.buffer, () => (message.isSent = true))) { + const completedSynchronously = this.write(stream.buffer); + if (stream.buffer.remaining === 0) { + // If the write call consumed the complete buffer, we assume the message is sent now for at-most-once + // semantics. + message.isSent = true; + } + + if (completedSynchronously) { // Entire buffer was written immediately. message.sent(); return AsyncStatus.Sent; @@ -1448,9 +1472,9 @@ export class ConnectionI { return ret; } - write(buffer, bufferFullyWritten) { + write(buffer) { const start = buffer.position; - const ret = this._transceiver.write(buffer, bufferFullyWritten); + const ret = this._transceiver.write(buffer); if (this._traceLevels.network >= 3 && buffer.position != start) { this._logger.trace( this._traceLevels.networkCat, diff --git a/js/src/Ice/IdleTimeoutTransceiverDecorator.js b/js/src/Ice/IdleTimeoutTransceiverDecorator.js index cfdc2965da8..1f07c42680f 100644 --- a/js/src/Ice/IdleTimeoutTransceiverDecorator.js +++ b/js/src/Ice/IdleTimeoutTransceiverDecorator.js @@ -41,9 +41,9 @@ export class IdleTimeoutTransceiverDecorator { this._decoratee.destroy(); } - write(buffer, bufferFullyWritten) { + write(buffer) { this.cancelWriteTimer(); - const completed = this._decoratee.write(buffer, bufferFullyWritten); + const completed = this._decoratee.write(buffer); if (completed) { // write completed this.rescheduleWriteTimer(); diff --git a/js/src/Ice/TcpEndpointI.js b/js/src/Ice/TcpEndpointI.js index f54b18cc8b3..2020762195e 100644 --- a/js/src/Ice/TcpEndpointI.js +++ b/js/src/Ice/TcpEndpointI.js @@ -112,10 +112,8 @@ export class TcpEndpointI extends IPEndpointI { } connectable() { - // // TCP endpoints are not connectable when running in a browser, SSL isn't currently supported. - // - return typeof TcpTransceiver !== null && !this.secure(); + return TcpTransceiver !== null && !this.secure(); } connect() { diff --git a/js/src/Ice/TcpTransceiver.js b/js/src/Ice/TcpTransceiver.js index 30ff59ad1ce..55db1948e46 100644 --- a/js/src/Ice/TcpTransceiver.js +++ b/js/src/Ice/TcpTransceiver.js @@ -145,15 +145,13 @@ if (typeof net.createConnection === "function") { * Write the given byte buffer to the socket. The buffer is written using multiple socket write calls. * * @param byteBuffer the byte buffer to write. - * @param bufferFullyWritten a callback that is called when the buffer has been fully written. - * @returns Whether or not the write completed synchronously. + * @returns Whether or not the write operation completed synchronously. **/ - write(byteBuffer, bufferFullyWritten) { + write(byteBuffer) { if (this._exception) { throw this._exception; } - DEV: console.assert(bufferFullyWritten); let packetSize = byteBuffer.remaining; DEV: console.assert(packetSize > 0); @@ -163,10 +161,6 @@ if (typeof net.createConnection === "function") { while (packetSize > 0) { const slice = byteBuffer.b.slice(byteBuffer.position, byteBuffer.position + packetSize); - - if (packetSize === byteBuffer.remaining) { - bufferFullyWritten(); - } let sync = true; sync = this._fd.write(Buffer.from(slice), null, () => { if (!sync) { diff --git a/js/src/Ice/WSTransceiver.js b/js/src/Ice/WSTransceiver.js index 6b6cc3fc4d2..9fd599c52dd 100644 --- a/js/src/Ice/WSTransceiver.js +++ b/js/src/Ice/WSTransceiver.js @@ -138,17 +138,15 @@ if (typeof WebSocket !== "undefined") { * Write the given byte buffer to the web socket. The buffer is written using multiple web socket send calls. * * @param byteBuffer the byte buffer to write. - * @param bufferFullyWritten a callback that is called when the buffer has been fully written. * @returns Whether or not the write completed synchronously. **/ - write(byteBuffer, bufferFullyWritten) { + write(byteBuffer) { if (this._exception) { throw this._exception; } else if (byteBuffer.remaining === 0) { return true; } DEV: console.assert(this._fd); - DEV: console.assert(bufferFullyWritten); const cb = () => { if (this._fd) { @@ -157,7 +155,7 @@ if (typeof WebSocket !== "undefined") { ? this._maxSendPacketSize : byteBuffer.remaining; if (this._fd.bufferedAmount + packetSize <= this._maxSendPacketSize) { - this._bytesWrittenCallback(0, 0); + this._bytesWrittenCallback(); } else { Timer.setTimeout(cb, this.writeReadyTimeout()); } @@ -179,9 +177,7 @@ if (typeof WebSocket !== "undefined") { } this._writeReadyTimeout = 0; const slice = byteBuffer.b.slice(byteBuffer.position, byteBuffer.position + packetSize); - if (packetSize === byteBuffer.remaining) { - bufferFullyWritten(); - } + this._fd.send(slice); byteBuffer.position += packetSize;