Skip to content

Commit

Permalink
feat: Use the ice4j push API. (#2195)
Browse files Browse the repository at this point in the history
* feat: Use the ice4j push API.
* feat: Put incoming DTLS packets on a queue.
* Start writing to the ICE transport as soon as we have any validated pair.
* chore: Bump to ice4j 3.2

---------

Co-authored-by: Jonathan Lennox <[email protected]>
  • Loading branch information
bgrozev and JonathanLennox authored Oct 2, 2024
1 parent 93b2699 commit 0b2e8d6
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 58 deletions.
32 changes: 12 additions & 20 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.jitsi.videobridge

import org.ice4j.util.Buffer
import org.jitsi.config.JitsiConfig
import org.jitsi.dcsctp4j.DcSctpMessage
import org.jitsi.dcsctp4j.ErrorKind
Expand Down Expand Up @@ -45,13 +46,11 @@ import org.jitsi.nlj.util.NEVER
import org.jitsi.nlj.util.PacketInfoQueue
import org.jitsi.nlj.util.RemoteSsrcAssociation
import org.jitsi.nlj.util.sumOf
import org.jitsi.rtp.Packet
import org.jitsi.rtp.UnparsedPacket
import org.jitsi.rtp.rtcp.RtcpSrPacket
import org.jitsi.rtp.rtcp.rtcpfb.RtcpFbPacket
import org.jitsi.rtp.rtcp.rtcpfb.payload_specific_fb.RtcpFbFirPacket
import org.jitsi.rtp.rtcp.rtcpfb.payload_specific_fb.RtcpFbPliPacket
import org.jitsi.rtp.rtp.RtpPacket
import org.jitsi.utils.MediaType
import org.jitsi.utils.concurrent.RecurringRunnableExecutor
import org.jitsi.utils.logging2.Logger
Expand Down Expand Up @@ -166,7 +165,7 @@ class Endpoint @JvmOverloads constructor(

/* TODO: do we ever want to support useUniquePort for an Endpoint? */
private val iceTransport = IceTransport(id, iceControlling, false, supportsPrivateAddresses, logger)
private val dtlsTransport = DtlsTransport(logger).also { it.cryptex = CryptexConfig.endpoint }
private val dtlsTransport = DtlsTransport(logger, id).also { it.cryptex = CryptexConfig.endpoint }

private var cryptex: Boolean = CryptexConfig.endpoint

Expand Down Expand Up @@ -407,41 +406,34 @@ class Endpoint @JvmOverloads constructor(

private fun setupIceTransport() {
iceTransport.incomingDataHandler = object : IceTransport.IncomingDataHandler {
override fun dataReceived(data: ByteArray, offset: Int, length: Int, receivedTime: Instant) {
// DTLS data will be handled by the DtlsTransport, but SRTP data can go
// straight to the transceiver
if (looksLikeDtls(data, offset, length)) {
// DTLS transport is responsible for making its own copy, because it will manage its own
// buffers
dtlsTransport.dtlsDataReceived(data, offset, length)
override fun dataReceived(buffer: Buffer) {
if (looksLikeDtls(buffer.buffer, buffer.offset, buffer.length)) {
// DTLS transport is responsible for making its own copy, because it will manage its own buffers
dtlsTransport.enqueueBuffer(buffer)
} else {
val copy = ByteBufferPool.getBuffer(
length +
RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET +
Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET
)
System.arraycopy(data, offset, copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length)
val pktInfo =
PacketInfo(UnparsedPacket(copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length)).apply {
this.receivedTime = receivedTime
PacketInfo(UnparsedPacket(buffer.buffer, buffer.offset, buffer.length)).apply {
this.receivedTime = buffer.receivedTime
}
transceiver.handleIncomingPacket(pktInfo)
}
}
}
iceTransport.eventHandler = object : IceTransport.EventHandler {
override fun connected() {
override fun writeable() {
logger.info("ICE connected")
transceiver.setOutgoingPacketHandler(object : PacketHandler {
override fun processPacket(packetInfo: PacketInfo) {
packetInfo.addEvent(SRTP_QUEUE_ENTRY_EVENT)
outgoingSrtpPacketQueue.add(packetInfo)
}
})
TaskPools.IO_POOL.execute(iceTransport::startReadingData)
TaskPools.IO_POOL.execute(dtlsTransport::startDtlsHandshake)
}

override fun connected() {
}

override fun failed() {
}

Expand Down
14 changes: 14 additions & 0 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package org.jitsi.videobridge

import org.eclipse.jetty.servlet.ServletHolder
import org.glassfish.jersey.servlet.ServletContainer
import org.ice4j.ice.harvest.AbstractUdpListener
import org.ice4j.ice.harvest.MappingCandidateHarvesters
import org.ice4j.util.Buffer
import org.jitsi.config.JitsiConfig
import org.jitsi.metaconfig.ConfigException
import org.jitsi.metaconfig.MetaconfigLogger
Expand All @@ -29,6 +31,8 @@ import org.jitsi.rest.createServer
import org.jitsi.rest.enableCors
import org.jitsi.rest.isEnabled
import org.jitsi.rest.servletContextHandler
import org.jitsi.rtp.Packet
import org.jitsi.rtp.rtp.RtpPacket
import org.jitsi.shutdown.ShutdownServiceImpl
import org.jitsi.utils.logging2.LoggerImpl
import org.jitsi.utils.queue.PacketQueue
Expand Down Expand Up @@ -220,6 +224,7 @@ private fun getSystemPropertyDefaults(): Map<String, String> {
}

private fun startIce4j() {
AbstractUdpListener.USE_PUSH_API = true
// Start the initialization of the mapping candidate harvesters.
// Asynchronous, because the AWS and STUN harvester may take a long
// time to initialize.
Expand All @@ -239,4 +244,13 @@ private fun setupBufferPools() {
org.jitsi.rtp.util.BufferPool.returnArray = { ByteBufferPool.returnBuffer(it) }
org.jitsi.nlj.util.BufferPool.getBuffer = { ByteBufferPool.getBuffer(it) }
org.jitsi.nlj.util.BufferPool.returnBuffer = { ByteBufferPool.returnBuffer(it) }
org.ice4j.util.BufferPool.getBuffer = { len ->
val b = ByteBufferPool.getBuffer(len)
Buffer(b, 0, b.size)
}
org.ice4j.util.BufferPool.returnBuffer = { ByteBufferPool.returnBuffer(it.buffer) }
org.ice4j.ice.harvest.AbstractUdpListener.BYTES_TO_LEAVE_AT_START_OF_PACKET =
RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET
org.ice4j.ice.harvest.AbstractUdpListener.BYTES_TO_LEAVE_AT_END_OF_PACKET =
Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET
}
28 changes: 11 additions & 17 deletions jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.jitsi.videobridge.relay

import org.ice4j.util.Buffer
import org.jitsi.dcsctp4j.DcSctpMessage
import org.jitsi.dcsctp4j.ErrorKind
import org.jitsi.dcsctp4j.SendPacketStatus
Expand Down Expand Up @@ -47,7 +48,6 @@ import org.jitsi.nlj.util.LocalSsrcAssociation
import org.jitsi.nlj.util.PacketInfoQueue
import org.jitsi.nlj.util.RemoteSsrcAssociation
import org.jitsi.nlj.util.sumOf
import org.jitsi.rtp.Packet
import org.jitsi.rtp.UnparsedPacket
import org.jitsi.rtp.extensions.looksLikeRtcp
import org.jitsi.rtp.extensions.looksLikeRtp
Expand Down Expand Up @@ -220,7 +220,7 @@ class Relay @JvmOverloads constructor(
clock = clock
)

private val dtlsTransport = DtlsTransport(logger).also { it.cryptex = CryptexConfig.relay }
private val dtlsTransport = DtlsTransport(logger, id).also { it.cryptex = CryptexConfig.relay }

private var cryptex = CryptexConfig.relay

Expand Down Expand Up @@ -365,44 +365,38 @@ class Relay @JvmOverloads constructor(

private fun setupIceTransport() {
iceTransport.incomingDataHandler = object : IceTransport.IncomingDataHandler {
override fun dataReceived(data: ByteArray, offset: Int, length: Int, receivedTime: Instant) {
override fun dataReceived(buffer: Buffer) {
// DTLS data will be handled by the DtlsTransport, but SRTP data can go
// straight to the transceiver
if (looksLikeDtls(data, offset, length)) {
// DTLS transport is responsible for making its own copy, because it will manage its own
// buffers
dtlsTransport.dtlsDataReceived(data, offset, length)
if (looksLikeDtls(buffer.buffer, buffer.offset, buffer.length)) {
// DTLS transport is responsible for making its own copy, because it will manage its own buffers
dtlsTransport.enqueueBuffer(buffer)
} else {
val copy = ByteBufferPool.getBuffer(
length +
RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET +
Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET
)
System.arraycopy(data, offset, copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length)
val pktInfo =
RelayedPacketInfo(
UnparsedPacket(copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length),
UnparsedPacket(buffer.buffer, buffer.offset, buffer.length),
meshId
).apply {
this.receivedTime = receivedTime
this.receivedTime = buffer.receivedTime
}
handleMediaPacket(pktInfo)
}
}
}
iceTransport.eventHandler = object : IceTransport.EventHandler {
override fun connected() {
override fun writeable() {
logger.info("ICE connected")
transceiver.setOutgoingPacketHandler(object : PacketHandler {
override fun processPacket(packetInfo: PacketInfo) {
packetInfo.addEvent(SRTP_QUEUE_ENTRY_EVENT)
outgoingSrtpPacketQueue.add(packetInfo)
}
})
TaskPools.IO_POOL.execute(iceTransport::startReadingData)
TaskPools.IO_POOL.execute(dtlsTransport::startDtlsHandshake)
}

override fun connected() {}

override fun failed() {}

override fun consentUpdated(time: Instant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

package org.jitsi.videobridge.transport.dtls

import org.ice4j.util.Buffer
import org.jitsi.nlj.dtls.DtlsClient
import org.jitsi.nlj.dtls.DtlsServer
import org.jitsi.nlj.dtls.DtlsStack
import org.jitsi.nlj.srtp.TlsRole
import org.jitsi.nlj.util.BufferPool
import org.jitsi.utils.OrderedJsonObject
import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.logging2.createChildLogger
import org.jitsi.utils.queue.PacketQueue
import org.jitsi.videobridge.util.TaskPools
import org.jitsi.xmpp.extensions.jingle.DtlsFingerprintPacketExtension
import org.jitsi.xmpp.extensions.jingle.IceUdpTransportPacketExtension
import java.util.concurrent.atomic.AtomicBoolean
Expand All @@ -39,7 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean
* be passed to the [outgoingDataHandler], which should be set by an
* interested party.
*/
class DtlsTransport(parentLogger: Logger) {
class DtlsTransport(parentLogger: Logger, id: String) {
private val logger = createChildLogger(parentLogger)

private val running = AtomicBoolean(true)
Expand All @@ -62,6 +66,26 @@ class DtlsTransport(parentLogger: Logger) {
/** Whether to advertise cryptex to peers. */
var cryptex = false

val dtlsQueue = object : PacketQueue<Buffer>(
128,
null,
"dtls-queue-$id",
{ buffer: Buffer ->
try {
dtlsDataReceived(buffer.buffer, buffer.offset, buffer.length)
true
} catch (e: Exception) {
logger.warn("Failed to handle DTLS data", e)
false
}
},
TaskPools.IO_POOL,
) {
override fun releasePacket(buffer: Buffer) {
BufferPool.returnBuffer(buffer.buffer)
}
}

/**
* The DTLS stack instance
*/
Expand Down Expand Up @@ -170,10 +194,13 @@ class DtlsTransport(parentLogger: Logger) {
}
}

fun enqueueBuffer(buffer: Buffer) = dtlsQueue.add(buffer)

/**
* Notify this layer that DTLS data has been received from the network
*/
fun dtlsDataReceived(data: ByteArray, off: Int, len: Int) = dtlsStack.processIncomingProtocolData(data, off, len)
private fun dtlsDataReceived(data: ByteArray, off: Int, len: Int) =
dtlsStack.processIncomingProtocolData(data, off, len)

/**
* Send out DTLS data
Expand Down
Loading

0 comments on commit 0b2e8d6

Please sign in to comment.