Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use the ice4j push API. #2195

Merged
merged 5 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 12 additions & 20 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.jitsi.videobridge

import org.ice4j.util.Buffer
import org.jitsi.config.JitsiConfig
import org.jitsi.dcsctp4j.DcSctpMessage
import org.jitsi.dcsctp4j.ErrorKind
Expand Down Expand Up @@ -45,13 +46,11 @@ import org.jitsi.nlj.util.NEVER
import org.jitsi.nlj.util.PacketInfoQueue
import org.jitsi.nlj.util.RemoteSsrcAssociation
import org.jitsi.nlj.util.sumOf
import org.jitsi.rtp.Packet
import org.jitsi.rtp.UnparsedPacket
import org.jitsi.rtp.rtcp.RtcpSrPacket
import org.jitsi.rtp.rtcp.rtcpfb.RtcpFbPacket
import org.jitsi.rtp.rtcp.rtcpfb.payload_specific_fb.RtcpFbFirPacket
import org.jitsi.rtp.rtcp.rtcpfb.payload_specific_fb.RtcpFbPliPacket
import org.jitsi.rtp.rtp.RtpPacket
import org.jitsi.utils.MediaType
import org.jitsi.utils.concurrent.RecurringRunnableExecutor
import org.jitsi.utils.logging2.Logger
Expand Down Expand Up @@ -166,7 +165,7 @@ class Endpoint @JvmOverloads constructor(

/* TODO: do we ever want to support useUniquePort for an Endpoint? */
private val iceTransport = IceTransport(id, iceControlling, false, supportsPrivateAddresses, logger)
private val dtlsTransport = DtlsTransport(logger).also { it.cryptex = CryptexConfig.endpoint }
private val dtlsTransport = DtlsTransport(logger, id).also { it.cryptex = CryptexConfig.endpoint }

private var cryptex: Boolean = CryptexConfig.endpoint

Expand Down Expand Up @@ -407,41 +406,34 @@ class Endpoint @JvmOverloads constructor(

private fun setupIceTransport() {
iceTransport.incomingDataHandler = object : IceTransport.IncomingDataHandler {
override fun dataReceived(data: ByteArray, offset: Int, length: Int, receivedTime: Instant) {
// DTLS data will be handled by the DtlsTransport, but SRTP data can go
// straight to the transceiver
if (looksLikeDtls(data, offset, length)) {
// DTLS transport is responsible for making its own copy, because it will manage its own
// buffers
dtlsTransport.dtlsDataReceived(data, offset, length)
override fun dataReceived(buffer: Buffer) {
if (looksLikeDtls(buffer.buffer, buffer.offset, buffer.length)) {
// DTLS transport is responsible for making its own copy, because it will manage its own buffers
dtlsTransport.enqueueBuffer(buffer)
} else {
val copy = ByteBufferPool.getBuffer(
length +
RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET +
Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET
)
System.arraycopy(data, offset, copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length)
val pktInfo =
PacketInfo(UnparsedPacket(copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length)).apply {
this.receivedTime = receivedTime
PacketInfo(UnparsedPacket(buffer.buffer, buffer.offset, buffer.length)).apply {
this.receivedTime = buffer.receivedTime
}
transceiver.handleIncomingPacket(pktInfo)
}
}
}
iceTransport.eventHandler = object : IceTransport.EventHandler {
override fun connected() {
override fun writeable() {
logger.info("ICE connected")
transceiver.setOutgoingPacketHandler(object : PacketHandler {
override fun processPacket(packetInfo: PacketInfo) {
packetInfo.addEvent(SRTP_QUEUE_ENTRY_EVENT)
outgoingSrtpPacketQueue.add(packetInfo)
}
})
TaskPools.IO_POOL.execute(iceTransport::startReadingData)
TaskPools.IO_POOL.execute(dtlsTransport::startDtlsHandshake)
}

override fun connected() {
}

override fun failed() {
}

Expand Down
14 changes: 14 additions & 0 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package org.jitsi.videobridge

import org.eclipse.jetty.servlet.ServletHolder
import org.glassfish.jersey.servlet.ServletContainer
import org.ice4j.ice.harvest.AbstractUdpListener
import org.ice4j.ice.harvest.MappingCandidateHarvesters
import org.ice4j.util.Buffer
import org.jitsi.config.JitsiConfig
import org.jitsi.metaconfig.ConfigException
import org.jitsi.metaconfig.MetaconfigLogger
Expand All @@ -29,6 +31,8 @@ import org.jitsi.rest.createServer
import org.jitsi.rest.enableCors
import org.jitsi.rest.isEnabled
import org.jitsi.rest.servletContextHandler
import org.jitsi.rtp.Packet
import org.jitsi.rtp.rtp.RtpPacket
import org.jitsi.shutdown.ShutdownServiceImpl
import org.jitsi.utils.logging2.LoggerImpl
import org.jitsi.utils.queue.PacketQueue
Expand Down Expand Up @@ -220,6 +224,7 @@ private fun getSystemPropertyDefaults(): Map<String, String> {
}

private fun startIce4j() {
AbstractUdpListener.USE_PUSH_API = true
// Start the initialization of the mapping candidate harvesters.
// Asynchronous, because the AWS and STUN harvester may take a long
// time to initialize.
Expand All @@ -239,4 +244,13 @@ private fun setupBufferPools() {
org.jitsi.rtp.util.BufferPool.returnArray = { ByteBufferPool.returnBuffer(it) }
org.jitsi.nlj.util.BufferPool.getBuffer = { ByteBufferPool.getBuffer(it) }
org.jitsi.nlj.util.BufferPool.returnBuffer = { ByteBufferPool.returnBuffer(it) }
org.ice4j.util.BufferPool.getBuffer = { len ->
val b = ByteBufferPool.getBuffer(len)
Buffer(b, 0, b.size)
}
org.ice4j.util.BufferPool.returnBuffer = { ByteBufferPool.returnBuffer(it.buffer) }
org.ice4j.ice.harvest.AbstractUdpListener.BYTES_TO_LEAVE_AT_START_OF_PACKET =
RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET
org.ice4j.ice.harvest.AbstractUdpListener.BYTES_TO_LEAVE_AT_END_OF_PACKET =
Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET
}
28 changes: 11 additions & 17 deletions jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.jitsi.videobridge.relay

import org.ice4j.util.Buffer
import org.jitsi.dcsctp4j.DcSctpMessage
import org.jitsi.dcsctp4j.ErrorKind
import org.jitsi.dcsctp4j.SendPacketStatus
Expand Down Expand Up @@ -47,7 +48,6 @@ import org.jitsi.nlj.util.LocalSsrcAssociation
import org.jitsi.nlj.util.PacketInfoQueue
import org.jitsi.nlj.util.RemoteSsrcAssociation
import org.jitsi.nlj.util.sumOf
import org.jitsi.rtp.Packet
import org.jitsi.rtp.UnparsedPacket
import org.jitsi.rtp.extensions.looksLikeRtcp
import org.jitsi.rtp.extensions.looksLikeRtp
Expand Down Expand Up @@ -220,7 +220,7 @@ class Relay @JvmOverloads constructor(
clock = clock
)

private val dtlsTransport = DtlsTransport(logger).also { it.cryptex = CryptexConfig.relay }
private val dtlsTransport = DtlsTransport(logger, id).also { it.cryptex = CryptexConfig.relay }

private var cryptex = CryptexConfig.relay

Expand Down Expand Up @@ -365,44 +365,38 @@ class Relay @JvmOverloads constructor(

private fun setupIceTransport() {
iceTransport.incomingDataHandler = object : IceTransport.IncomingDataHandler {
override fun dataReceived(data: ByteArray, offset: Int, length: Int, receivedTime: Instant) {
override fun dataReceived(buffer: Buffer) {
// DTLS data will be handled by the DtlsTransport, but SRTP data can go
// straight to the transceiver
if (looksLikeDtls(data, offset, length)) {
// DTLS transport is responsible for making its own copy, because it will manage its own
// buffers
dtlsTransport.dtlsDataReceived(data, offset, length)
if (looksLikeDtls(buffer.buffer, buffer.offset, buffer.length)) {
// DTLS transport is responsible for making its own copy, because it will manage its own buffers
dtlsTransport.enqueueBuffer(buffer)
} else {
val copy = ByteBufferPool.getBuffer(
length +
RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET +
Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET
)
System.arraycopy(data, offset, copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length)
val pktInfo =
RelayedPacketInfo(
UnparsedPacket(copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length),
UnparsedPacket(buffer.buffer, buffer.offset, buffer.length),
meshId
).apply {
this.receivedTime = receivedTime
this.receivedTime = buffer.receivedTime
}
handleMediaPacket(pktInfo)
}
}
}
iceTransport.eventHandler = object : IceTransport.EventHandler {
override fun connected() {
override fun writeable() {
logger.info("ICE connected")
transceiver.setOutgoingPacketHandler(object : PacketHandler {
override fun processPacket(packetInfo: PacketInfo) {
packetInfo.addEvent(SRTP_QUEUE_ENTRY_EVENT)
outgoingSrtpPacketQueue.add(packetInfo)
}
})
TaskPools.IO_POOL.execute(iceTransport::startReadingData)
TaskPools.IO_POOL.execute(dtlsTransport::startDtlsHandshake)
}

override fun connected() {}

override fun failed() {}

override fun consentUpdated(time: Instant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

package org.jitsi.videobridge.transport.dtls

import org.ice4j.util.Buffer
import org.jitsi.nlj.dtls.DtlsClient
import org.jitsi.nlj.dtls.DtlsServer
import org.jitsi.nlj.dtls.DtlsStack
import org.jitsi.nlj.srtp.TlsRole
import org.jitsi.nlj.util.BufferPool
import org.jitsi.utils.OrderedJsonObject
import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.logging2.createChildLogger
import org.jitsi.utils.queue.PacketQueue
import org.jitsi.videobridge.util.TaskPools
import org.jitsi.xmpp.extensions.jingle.DtlsFingerprintPacketExtension
import org.jitsi.xmpp.extensions.jingle.IceUdpTransportPacketExtension
import java.util.concurrent.atomic.AtomicBoolean
Expand All @@ -39,7 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean
* be passed to the [outgoingDataHandler], which should be set by an
* interested party.
*/
class DtlsTransport(parentLogger: Logger) {
class DtlsTransport(parentLogger: Logger, id: String) {
private val logger = createChildLogger(parentLogger)

private val running = AtomicBoolean(true)
Expand All @@ -62,6 +66,26 @@ class DtlsTransport(parentLogger: Logger) {
/** Whether to advertise cryptex to peers. */
var cryptex = false

val dtlsQueue = object : PacketQueue<Buffer>(
128,
null,
"dtls-queue-$id",
{ buffer: Buffer ->
try {
dtlsDataReceived(buffer.buffer, buffer.offset, buffer.length)
true
} catch (e: Exception) {
logger.warn("Failed to handle DTLS data", e)
false
}
},
TaskPools.IO_POOL,
) {
override fun releasePacket(buffer: Buffer) {
BufferPool.returnBuffer(buffer.buffer)
}
}

/**
* The DTLS stack instance
*/
Expand Down Expand Up @@ -170,10 +194,13 @@ class DtlsTransport(parentLogger: Logger) {
}
}

fun enqueueBuffer(buffer: Buffer) = dtlsQueue.add(buffer)

/**
* Notify this layer that DTLS data has been received from the network
*/
fun dtlsDataReceived(data: ByteArray, off: Int, len: Int) = dtlsStack.processIncomingProtocolData(data, off, len)
private fun dtlsDataReceived(data: ByteArray, off: Int, len: Int) =
dtlsStack.processIncomingProtocolData(data, off, len)

/**
* Send out DTLS data
Expand Down
Loading
Loading