From 37da7e13f2afea81128e5bf6da17dd431764a6a0 Mon Sep 17 00:00:00 2001 From: bgrozev Date: Mon, 25 Sep 2023 09:34:59 -0500 Subject: [PATCH] fix: Fix a leak of WebSocketClients. (#2051) Every time we establish an active WS to another relay we create a new WebSocketClient instance. This comes with its own QueuedThreadPool with 8 threads and if we lose the reference without calling stop() it leaks. When the existing web socket closing arrives before the signaling that expires the relay, we try do WebSocketClient.stop() and then doConnect(). The call to stop() can take a while, and if in the meantime the relay is expired in signaling we end up running doConnect() on an expired relay. In this case we create a new WebSocketClient, which is never stopped. This fix uses a single WebSocketClient instance shared between all relays, and does not attempt to re-connect if the reason for the disconnect is RELAY_CLOSED. It re-uses the "webSocket: ColibriWebSocket" field for sockets created both actively and passively. --- .../relay/RelayMessageTransport.kt | 57 ++++++++----------- 1 file changed, 24 insertions(+), 33 deletions(-) diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/RelayMessageTransport.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/RelayMessageTransport.kt index edb0a978ca..699dc17bd4 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/RelayMessageTransport.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/RelayMessageTransport.kt @@ -36,7 +36,6 @@ import org.jitsi.videobridge.message.EndpointStats import org.jitsi.videobridge.message.ServerHelloMessage import org.jitsi.videobridge.message.SourceVideoTypeMessage import org.jitsi.videobridge.message.VideoTypeMessage -import org.jitsi.videobridge.util.TaskPools import org.jitsi.videobridge.websocket.ColibriWebSocket import org.json.simple.JSONObject import java.lang.ref.WeakReference @@ -67,11 +66,6 @@ class RelayMessageTransport( */ private var url: String? = null - /** - * An active websocket client. - */ - private var outgoingWebsocket: WebSocketClient? = null - /** * Use to synchronize access to [webSocket] */ @@ -113,15 +107,11 @@ class RelayMessageTransport( webSocket = null } - // this.webSocket should only be initialized when it has connected (via [webSocketConnected]). - val newWebSocket = ColibriWebSocket(relay.id, this) - outgoingWebsocket?.let { - logger.warn("Re-connecting while outgoingWebsocket != null, possible leak.") - it.stop() - } - outgoingWebsocket = WebSocketClient().also { - it.start() - it.connect(newWebSocket, URI(url), ClientUpgradeRequest()) + ColibriWebSocket(relay.id, this).also { + webSocketClient.connect(it, URI(url), ClientUpgradeRequest()) + synchronized(webSocketSyncRoot) { + webSocket = it + } } } @@ -311,7 +301,7 @@ class RelayMessageTransport( get() = getActiveTransportChannel() != null val isActive: Boolean - get() = outgoingWebsocket != null + get() = url != null /** * {@inheritDoc} @@ -319,7 +309,7 @@ class RelayMessageTransport( override fun webSocketConnected(ws: ColibriWebSocket) { synchronized(webSocketSyncRoot) { // If we already have a web-socket, discard it and use the new one. - if (ws != webSocket) { + if (ws != webSocket && webSocket != null) { logger.info("Replacing an existing websocket.") webSocket?.session?.close(CloseStatus.NORMAL, "replaced") webSocketLastActive = true @@ -355,10 +345,10 @@ class RelayMessageTransport( logger.debug { "Web socket closed, statusCode $statusCode ( $reason)." } } } - outgoingWebsocket?.let { - // Try to reconnect. TODO: how to handle failures? - it.stop() - outgoingWebsocket = null + + // This check avoids trying to establish a new WS when the closing of the existing WS races the signaling to + // expire the relay. 1001 with RELAY_CLOSED means that the remote side willingly closed the socket. + if (statusCode != 1001 || reason != RELAY_CLOSED) { doConnect() } } @@ -374,22 +364,11 @@ class RelayMessageTransport( if (webSocket != null) { // 410 Gone indicates that the resource requested is no longer // available and will not be available again. - webSocket?.session?.close(CloseStatus.SHUTDOWN, "relay closed") + webSocket?.session?.close(CloseStatus.SHUTDOWN, RELAY_CLOSED) webSocket = null logger.debug { "Relay expired, closed colibri web-socket." } } } - outgoingWebsocket?.let { - // Stopping might block and we don't want to hold the thread processing signaling. - TaskPools.IO_POOL.submit { - try { - it.stop() - } catch (e: Exception) { - logger.warn("Error while stopping outgoing web socket", e) - } - } - } - outgoingWebsocket = null } /** @@ -509,4 +488,16 @@ class RelayMessageTransport( conference.sendMessageFromRelay(message, true, relay.meshId) return null } + + companion object { + /** + * The single [WebSocketClient] instance that all [Relay]s use to initiate a web socket connection. + */ + val webSocketClient = WebSocketClient().apply { start() } + + /** + * Reason to use when closing a WS due to the relay being expired. + */ + const val RELAY_CLOSED = "relay_closed" + } }