Skip to content

Commit

Permalink
fix: Fix a leak of WebSocketClients. (#2051)
Browse files Browse the repository at this point in the history
Every time we establish an active WS to another relay we create a new
WebSocketClient instance. This comes with its own QueuedThreadPool with 8 threads
and if we lose the reference without calling stop() it leaks.

When the existing web socket closing arrives before the signaling that
expires the relay, we try do WebSocketClient.stop() and then
doConnect(). The call to stop() can take a while, and if in the meantime
the relay is expired in signaling we end up running doConnect() on
an expired relay. In this case we create a new WebSocketClient, which
is never stopped.

This fix uses a single WebSocketClient instance shared between all
relays, and does not attempt to re-connect if the reason for the
disconnect is RELAY_CLOSED. It re-uses the "webSocket: ColibriWebSocket"
field for sockets created both actively and passively.
  • Loading branch information
bgrozev authored Sep 25, 2023
1 parent 8983b11 commit 37da7e1
Showing 1 changed file with 24 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.jitsi.videobridge.message.EndpointStats
import org.jitsi.videobridge.message.ServerHelloMessage
import org.jitsi.videobridge.message.SourceVideoTypeMessage
import org.jitsi.videobridge.message.VideoTypeMessage
import org.jitsi.videobridge.util.TaskPools
import org.jitsi.videobridge.websocket.ColibriWebSocket
import org.json.simple.JSONObject
import java.lang.ref.WeakReference
Expand Down Expand Up @@ -67,11 +66,6 @@ class RelayMessageTransport(
*/
private var url: String? = null

/**
* An active websocket client.
*/
private var outgoingWebsocket: WebSocketClient? = null

/**
* Use to synchronize access to [webSocket]
*/
Expand Down Expand Up @@ -113,15 +107,11 @@ class RelayMessageTransport(
webSocket = null
}

// this.webSocket should only be initialized when it has connected (via [webSocketConnected]).
val newWebSocket = ColibriWebSocket(relay.id, this)
outgoingWebsocket?.let {
logger.warn("Re-connecting while outgoingWebsocket != null, possible leak.")
it.stop()
}
outgoingWebsocket = WebSocketClient().also {
it.start()
it.connect(newWebSocket, URI(url), ClientUpgradeRequest())
ColibriWebSocket(relay.id, this).also {
webSocketClient.connect(it, URI(url), ClientUpgradeRequest())
synchronized(webSocketSyncRoot) {
webSocket = it
}
}
}

Expand Down Expand Up @@ -311,15 +301,15 @@ class RelayMessageTransport(
get() = getActiveTransportChannel() != null

val isActive: Boolean
get() = outgoingWebsocket != null
get() = url != null

/**
* {@inheritDoc}
*/
override fun webSocketConnected(ws: ColibriWebSocket) {
synchronized(webSocketSyncRoot) {
// If we already have a web-socket, discard it and use the new one.
if (ws != webSocket) {
if (ws != webSocket && webSocket != null) {
logger.info("Replacing an existing websocket.")
webSocket?.session?.close(CloseStatus.NORMAL, "replaced")
webSocketLastActive = true
Expand Down Expand Up @@ -355,10 +345,10 @@ class RelayMessageTransport(
logger.debug { "Web socket closed, statusCode $statusCode ( $reason)." }
}
}
outgoingWebsocket?.let {
// Try to reconnect. TODO: how to handle failures?
it.stop()
outgoingWebsocket = null

// This check avoids trying to establish a new WS when the closing of the existing WS races the signaling to
// expire the relay. 1001 with RELAY_CLOSED means that the remote side willingly closed the socket.
if (statusCode != 1001 || reason != RELAY_CLOSED) {
doConnect()
}
}
Expand All @@ -374,22 +364,11 @@ class RelayMessageTransport(
if (webSocket != null) {
// 410 Gone indicates that the resource requested is no longer
// available and will not be available again.
webSocket?.session?.close(CloseStatus.SHUTDOWN, "relay closed")
webSocket?.session?.close(CloseStatus.SHUTDOWN, RELAY_CLOSED)
webSocket = null
logger.debug { "Relay expired, closed colibri web-socket." }
}
}
outgoingWebsocket?.let {
// Stopping might block and we don't want to hold the thread processing signaling.
TaskPools.IO_POOL.submit {
try {
it.stop()
} catch (e: Exception) {
logger.warn("Error while stopping outgoing web socket", e)
}
}
}
outgoingWebsocket = null
}

/**
Expand Down Expand Up @@ -509,4 +488,16 @@ class RelayMessageTransport(
conference.sendMessageFromRelay(message, true, relay.meshId)
return null
}

companion object {
/**
* The single [WebSocketClient] instance that all [Relay]s use to initiate a web socket connection.
*/
val webSocketClient = WebSocketClient().apply { start() }

/**
* Reason to use when closing a WS due to the relay being expired.
*/
const val RELAY_CLOSED = "relay_closed"
}
}

0 comments on commit 37da7e1

Please sign in to comment.