Skip to content

Commit

Permalink
Fix WebSocket threads CPU usage
Browse files Browse the repository at this point in the history
  • Loading branch information
afondard committed May 21, 2024
1 parent 3233c03 commit 11a633e
Showing 1 changed file with 13 additions and 18 deletions.
31 changes: 13 additions & 18 deletions src/main/kotlin/io/kuzzle/sdk/protocol/WebSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ import java.io.IOException
import java.net.ConnectException
import java.net.SocketException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.thread

open class WebSocket : AbstractProtocol {
protected open var ws: DefaultClientWebSocketSession? = null
private val host: String
private val port: Int
private val isSsl: Boolean
private val queue: ConcurrentLinkedQueue<String> = ConcurrentLinkedQueue()
private val queue: ArrayDeque<String> = ArrayDeque()
override var state: ProtocolState = ProtocolState.CLOSE
private val autoReconnect: Boolean
private val reconnectionDelay: Long
Expand Down Expand Up @@ -114,13 +112,11 @@ open class WebSocket : AbstractProtocol {
state = ProtocolState.OPEN
trigger(NetworkStateChangeEvent(ProtocolState.OPEN))

thread(start = true) {
while (ws != null) {
val payload = queue.poll()
if (payload != null) {
GlobalScope.launch {
ws?.send(Frame.Text(payload))
}
while (queue.isNotEmpty()) {
val payload = queue.removeFirstOrNull()
if (payload != null) {
GlobalScope.launch {
ws?.send(Frame.Text(payload))
}
}
}
Expand Down Expand Up @@ -176,13 +172,6 @@ open class WebSocket : AbstractProtocol {

// On connection success
stopRetryingToConnect.set(false)

// This thread is here to let JAVA run until the socket is closed
// In Kotlin this is handled by the block function above but for some reason in JAVA it is
// non blocking.
thread(start = true) {
while (state != ProtocolState.CLOSE) {}
}
} catch (e: Exception) {
when (e) {
is ConnectException,
Expand Down Expand Up @@ -223,6 +212,12 @@ open class WebSocket : AbstractProtocol {
}

override fun send(payload: KuzzleMap) {
queue.add(JsonSerializer.serialize(payload))
if (state == ProtocolState.RECONNECTING) {
queue.add(JsonSerializer.serialize(payload))
} else {
GlobalScope.launch {
ws?.send(Frame.Text(JsonSerializer.serialize(payload)))
}
}
}
}

0 comments on commit 11a633e

Please sign in to comment.