Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix JavaScript connection scheduling of heartbeats #3164

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions js/src/Ice/ConnectionI.js
Original file line number Diff line number Diff line change
Expand Up @@ -415,14 +415,26 @@ export class ConnectionI {
let info = null;
let response = null;
try {
if ((operation & SocketOperation.Write) !== 0 && this._writeStream.buffer.remaining > 0) {
DEV: console.assert(this._sendStreams.length > 0);
const currentMessage = this._sendStreams[0];
if (!this.write(this._writeStream.buffer, () => (currentMessage.isSent = true))) {
DEV: console.assert(!this._writeStream.isEmpty());
return;
if ((operation & SocketOperation.Write) !== 0) {
if (this._writeStream.buffer.remaining > 0) {
DEV: console.assert(this._sendStreams.length > 0);
const completedSynchronously = this.write(this._writeStream.buffer);
if (this._writeStream.buffer.remaining === 0) {
// If the write call consumed the complete buffer, we assume the message is sent now for
// at-most-once semantics.
this._sendStreams[0].isSent = true;
}
Comment on lines +422 to +426
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This replaces the previous callback passed to write.


if (!completedSynchronously) {
DEV: console.assert(!this._writeStream.isEmpty());
return;
}
DEV: console.assert(this._writeStream.buffer.remaining === 0);
} else if (this._transceiver instanceof IdleTimeoutTransceiverDecorator) {
// The writing of the current message completed, schedule a heartbeat in case the connection has
// nothing more to write.
this._transceiver.scheduleHeartbeat();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensure that a heartbeat is scheduled even if write completes asynchronously. In this case Transceiver.write is not called and the decorator would not schedule one.

}
DEV: console.assert(this._writeStream.buffer.remaining === 0);
}
if ((operation & SocketOperation.Read) !== 0 && !this._readStream.isEmpty()) {
if (this._readHeader) {
Expand Down Expand Up @@ -1156,13 +1168,18 @@ export class ConnectionI {
this._writeStream.swap(message.stream);

// Send the message.
const currentMessage = message;
if (
this._writeStream.pos != this._writeStream.size &&
!this.write(this._writeStream.buffer, () => (currentMessage.isSent = true))
) {
DEV: console.assert(!this._writeStream.isEmpty());
return response; // not done
if (this._writeStream.pos != this._writeStream.size) {
const completedSynchronously = this.write(this._writeStream.buffer);
if (this._writeStream.buffer.remaining === 0) {
// If the write call consumed the complete buffer, we assume the message is sent now for
// at-most-once semantics.
message.isSent = true;
}

if (!completedSynchronously) {
DEV: console.assert(!this._writeStream.isEmpty());
return response; // not done
}
}

// The message was sent synchronously, notify the message, remove it from the sent queue and keep going.
Expand Down Expand Up @@ -1204,7 +1221,14 @@ export class ConnectionI {

TraceUtil.traceSend(stream, this._instance, this, this._logger, this._traceLevels);

if (this.write(stream.buffer, () => (message.isSent = true))) {
const completedSynchronously = this.write(stream.buffer);
if (stream.buffer.remaining === 0) {
// If the write call consumed the complete buffer, we assume the message is sent now for at-most-once
// semantics.
message.isSent = true;
}

if (completedSynchronously) {
// Entire buffer was written immediately.
message.sent();
return AsyncStatus.Sent;
Expand Down Expand Up @@ -1448,9 +1472,9 @@ export class ConnectionI {
return ret;
}

write(buffer, bufferFullyWritten) {
write(buffer) {
const start = buffer.position;
const ret = this._transceiver.write(buffer, bufferFullyWritten);
const ret = this._transceiver.write(buffer);
if (this._traceLevels.network >= 3 && buffer.position != start) {
this._logger.trace(
this._traceLevels.networkCat,
Expand Down
4 changes: 2 additions & 2 deletions js/src/Ice/IdleTimeoutTransceiverDecorator.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ export class IdleTimeoutTransceiverDecorator {
this._decoratee.destroy();
}

write(buffer, bufferFullyWritten) {
write(buffer) {
this.cancelWriteTimer();
const completed = this._decoratee.write(buffer, bufferFullyWritten);
const completed = this._decoratee.write(buffer);
if (completed) {
// write completed
this.rescheduleWriteTimer();
Expand Down
4 changes: 1 addition & 3 deletions js/src/Ice/TcpEndpointI.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,8 @@ export class TcpEndpointI extends IPEndpointI {
}

connectable() {
//
// TCP endpoints are not connectable when running in a browser, SSL isn't currently supported.
//
return typeof TcpTransceiver !== null && !this.secure();
return TcpTransceiver !== null && !this.secure();
}

connect() {
Expand Down
10 changes: 2 additions & 8 deletions js/src/Ice/TcpTransceiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,13 @@ if (typeof net.createConnection === "function") {
* Write the given byte buffer to the socket. The buffer is written using multiple socket write calls.
*
* @param byteBuffer the byte buffer to write.
* @param bufferFullyWritten a callback that is called when the buffer has been fully written.
* @returns Whether or not the write completed synchronously.
* @returns Whether or not the write operation completed synchronously.
**/
write(byteBuffer, bufferFullyWritten) {
write(byteBuffer) {
if (this._exception) {
throw this._exception;
}

DEV: console.assert(bufferFullyWritten);
let packetSize = byteBuffer.remaining;
DEV: console.assert(packetSize > 0);

Expand All @@ -163,10 +161,6 @@ if (typeof net.createConnection === "function") {

while (packetSize > 0) {
const slice = byteBuffer.b.slice(byteBuffer.position, byteBuffer.position + packetSize);

if (packetSize === byteBuffer.remaining) {
bufferFullyWritten();
}
let sync = true;
sync = this._fd.write(Buffer.from(slice), null, () => {
if (!sync) {
Expand Down
10 changes: 3 additions & 7 deletions js/src/Ice/WSTransceiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,15 @@ if (typeof WebSocket !== "undefined") {
* Write the given byte buffer to the web socket. The buffer is written using multiple web socket send calls.
*
* @param byteBuffer the byte buffer to write.
* @param bufferFullyWritten a callback that is called when the buffer has been fully written.
* @returns Whether or not the write completed synchronously.
**/
write(byteBuffer, bufferFullyWritten) {
write(byteBuffer) {
if (this._exception) {
throw this._exception;
} else if (byteBuffer.remaining === 0) {
return true;
}
DEV: console.assert(this._fd);
DEV: console.assert(bufferFullyWritten);

const cb = () => {
if (this._fd) {
Expand All @@ -157,7 +155,7 @@ if (typeof WebSocket !== "undefined") {
? this._maxSendPacketSize
: byteBuffer.remaining;
if (this._fd.bufferedAmount + packetSize <= this._maxSendPacketSize) {
this._bytesWrittenCallback(0, 0);
this._bytesWrittenCallback();
} else {
Timer.setTimeout(cb, this.writeReadyTimeout());
}
Expand All @@ -179,9 +177,7 @@ if (typeof WebSocket !== "undefined") {
}
this._writeReadyTimeout = 0;
const slice = byteBuffer.b.slice(byteBuffer.position, byteBuffer.position + packetSize);
if (packetSize === byteBuffer.remaining) {
bufferFullyWritten();
}

this._fd.send(slice);
byteBuffer.position += packetSize;

Expand Down