Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't feed SCTP packets to the SCTP stack after it's been closed. #2221

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ class Endpoint @JvmOverloads constructor(
sctpHandler?.stop()
usrSctpHandler?.stop()
sctpManager?.closeConnection()
sctpTransport?.socket?.close()
sctpTransport?.stop()
} catch (t: Throwable) {
logger.error("Exception while expiring: ", t)
}
Expand Down Expand Up @@ -1240,7 +1240,7 @@ class Endpoint @JvmOverloads constructor(
}

override fun OnAborted(error: ErrorKind, message: String) {
logger.warn("SCTP aborted with error $error: $message")
logger.info("SCTP aborted with error $error: $message")
}

override fun OnConnected() {
Expand All @@ -1249,7 +1249,7 @@ class Endpoint @JvmOverloads constructor(
val dataChannelStack = DataChannelStack(
{ data, sid, ppid ->
val message = DcSctpMessage(sid.toShort(), ppid, data.array())
val status = sctpTransport?.socket?.send(message, DcSctpTransport.DEFAULT_SEND_OPTIONS)
val status = sctpTransport?.send(message, DcSctpTransport.DEFAULT_SEND_OPTIONS)
return@DataChannelStack if (status == SendStatus.kSuccess) {
0
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
*/
package org.jitsi.videobridge.dcsctp

import org.jitsi.dcsctp4j.DcSctpMessage
import org.jitsi.dcsctp4j.DcSctpOptions
import org.jitsi.dcsctp4j.DcSctpSocketCallbacks
import org.jitsi.dcsctp4j.DcSctpSocketFactory
import org.jitsi.dcsctp4j.DcSctpSocketInterface
import org.jitsi.dcsctp4j.SendOptions
import org.jitsi.dcsctp4j.SendStatus
import org.jitsi.dcsctp4j.Timeout
import org.jitsi.nlj.PacketInfo
import org.jitsi.utils.OrderedJsonObject
Expand All @@ -40,19 +42,51 @@ class DcSctpTransport(
parentLogger: Logger
) {
val logger = createChildLogger(parentLogger)
lateinit var socket: DcSctpSocketInterface
private val lock = Any()
private var socket: DcSctpSocketInterface? = null

fun start(callbacks: DcSctpSocketCallbacks, options: DcSctpOptions = DEFAULT_SOCKET_OPTIONS) {
socket = factory.create(name, callbacks, null, options)
synchronized(lock) {
socket = factory.create(name, callbacks, null, options)
}
}

fun handleIncomingSctp(packetInfo: PacketInfo) {
val packet = packetInfo.packet
socket.receivePacket(packet.getBuffer(), packet.getOffset(), packet.getLength())
synchronized(lock) {
socket?.receivePacket(packet.getBuffer(), packet.getOffset(), packet.getLength())
bgrozev marked this conversation as resolved.
Show resolved Hide resolved
}
}

fun stop() {
synchronized(lock) {
socket?.close()
socket = null
}
}

fun connect() {
synchronized(lock) {
socket?.connect()
bgrozev marked this conversation as resolved.
Show resolved Hide resolved
}
}

fun send(message: DcSctpMessage, options: SendOptions): SendStatus {
synchronized(lock) {
return socket?.send(message, options) ?: SendStatus.kErrorShuttingDown
}
}

fun handleTimeout(timeoutId: Long) {
synchronized(lock) {
socket?.handleTimeout(timeoutId)
}
}

fun getDebugState(): OrderedJsonObject {
val metrics = socket.metrics
val metrics = synchronized(lock) {
socket?.metrics
}
return OrderedJsonObject().apply {
if (metrics != null) {
put("tx_packets_count", metrics.txPacketsCount)
Expand Down Expand Up @@ -157,7 +191,7 @@ abstract class DcSctpBaseCallbacks(
scheduledFuture = TaskPools.SCHEDULED_POOL.schedule({
/* Execute it on the IO_POOL, because a timer may trigger sending new SCTP packets. */
future = TaskPools.IO_POOL.submit {
transport?.socket?.handleTimeout(timeoutId)
transport?.handleTimeout(timeoutId)
}
}, duration, TimeUnit.MILLISECONDS)
} catch (e: Throwable) {
Expand Down
10 changes: 5 additions & 5 deletions jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ class Relay @JvmOverloads constructor(
scheduleRelayMessageTransportTimeout()
} else if (sctpConfig.enabled) {
if (sctpRole == Sctp.Role.CLIENT) {
sctpTransport!!.socket.connect()
sctpTransport!!.connect()
}
}
}
Expand Down Expand Up @@ -498,7 +498,7 @@ class Relay @JvmOverloads constructor(
it.start(SctpCallbacks(it))
sctpHandler!!.setSctpTransport(it)
if (dtlsTransport.isConnected && sctpDesc.role == Sctp.Role.CLIENT) {
it.socket.connect()
it.connect()
}
}
}
Expand Down Expand Up @@ -1172,7 +1172,7 @@ class Relay @JvmOverloads constructor(
sctpHandler?.stop()
usrSctpHandler?.stop()
sctpManager?.closeConnection()
sctpTransport?.socket?.close()
sctpTransport?.stop()
} catch (t: Throwable) {
logger.error("Exception while expiring: ", t)
}
Expand Down Expand Up @@ -1266,7 +1266,7 @@ class Relay @JvmOverloads constructor(
}

override fun OnAborted(error: ErrorKind, message: String) {
logger.warn("SCTP aborted with error $error: $message")
logger.info("SCTP aborted with error $error: $message")
}

override fun OnConnected() {
Expand All @@ -1275,7 +1275,7 @@ class Relay @JvmOverloads constructor(
val dataChannelStack = DataChannelStack(
{ data, sid, ppid ->
val message = DcSctpMessage(sid.toShort(), ppid, data.array())
val status = sctpTransport?.socket?.send(message, DcSctpTransport.DEFAULT_SEND_OPTIONS)
val status = sctpTransport?.send(message, DcSctpTransport.DEFAULT_SEND_OPTIONS)
return@DataChannelStack if (status == SendStatus.kSuccess) {
0
} else {
Expand Down
Loading