From 70c9ea60f7ddddecbfd0a7d8e72cae084608e418 Mon Sep 17 00:00:00 2001 From: Boris Grozev Date: Thu, 8 Aug 2024 17:02:50 -0500 Subject: [PATCH 1/5] feat: Use the ice4j push API. --- .../kotlin/org/jitsi/videobridge/Endpoint.kt | 26 +++----- .../main/kotlin/org/jitsi/videobridge/Main.kt | 12 ++++ .../org/jitsi/videobridge/relay/Relay.kt | 20 +++---- .../videobridge/transport/ice/IceTransport.kt | 59 +++++++++++++------ jvb/src/main/resources/application.conf | 1 + 5 files changed, 70 insertions(+), 48 deletions(-) diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt index 1b6a3c7bab..f0a45ccaee 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt @@ -16,6 +16,7 @@ package org.jitsi.videobridge +import org.ice4j.util.Buffer import org.jitsi.config.JitsiConfig import org.jitsi.dcsctp4j.DcSctpMessage import org.jitsi.dcsctp4j.ErrorKind @@ -45,13 +46,11 @@ import org.jitsi.nlj.util.NEVER import org.jitsi.nlj.util.PacketInfoQueue import org.jitsi.nlj.util.RemoteSsrcAssociation import org.jitsi.nlj.util.sumOf -import org.jitsi.rtp.Packet import org.jitsi.rtp.UnparsedPacket import org.jitsi.rtp.rtcp.RtcpSrPacket import org.jitsi.rtp.rtcp.rtcpfb.RtcpFbPacket import org.jitsi.rtp.rtcp.rtcpfb.payload_specific_fb.RtcpFbFirPacket import org.jitsi.rtp.rtcp.rtcpfb.payload_specific_fb.RtcpFbPliPacket -import org.jitsi.rtp.rtp.RtpPacket import org.jitsi.utils.MediaType import org.jitsi.utils.concurrent.RecurringRunnableExecutor import org.jitsi.utils.logging2.Logger @@ -407,23 +406,15 @@ class Endpoint @JvmOverloads constructor( private fun setupIceTransport() { iceTransport.incomingDataHandler = object : IceTransport.IncomingDataHandler { - override fun dataReceived(data: ByteArray, offset: Int, length: Int, receivedTime: Instant) { - // DTLS data will be handled by the DtlsTransport, but SRTP data can go - // straight to the transceiver - if (looksLikeDtls(data, offset, length)) { - // DTLS transport is responsible for making its own copy, because it will manage its own - // buffers - dtlsTransport.dtlsDataReceived(data, offset, length) + override fun dataReceived(buffer: Buffer) { + if (looksLikeDtls(buffer.buffer, buffer.offset, buffer.length)) { + // DTLS transport is responsible for making its own copy, because it will manage its own buffers + // TODO: place on a queue, we can't risk blocking the ice4j thread. + dtlsTransport.dtlsDataReceived(buffer.buffer, buffer.offset, buffer.length) } else { - val copy = ByteBufferPool.getBuffer( - length + - RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET + - Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET - ) - System.arraycopy(data, offset, copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length) val pktInfo = - PacketInfo(UnparsedPacket(copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length)).apply { - this.receivedTime = receivedTime + PacketInfo(UnparsedPacket(buffer.buffer, buffer.offset, buffer.length)).apply { + this.receivedTime = buffer.receivedTime } transceiver.handleIncomingPacket(pktInfo) } @@ -438,7 +429,6 @@ class Endpoint @JvmOverloads constructor( outgoingSrtpPacketQueue.add(packetInfo) } }) - TaskPools.IO_POOL.execute(iceTransport::startReadingData) TaskPools.IO_POOL.execute(dtlsTransport::startDtlsHandshake) } diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt index ec623884ae..a218fdb2ca 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt @@ -19,6 +19,7 @@ package org.jitsi.videobridge import org.eclipse.jetty.servlet.ServletHolder import org.glassfish.jersey.servlet.ServletContainer import org.ice4j.ice.harvest.MappingCandidateHarvesters +import org.ice4j.util.Buffer import org.jitsi.config.JitsiConfig import org.jitsi.metaconfig.ConfigException import org.jitsi.metaconfig.MetaconfigLogger @@ -29,6 +30,8 @@ import org.jitsi.rest.createServer import org.jitsi.rest.enableCors import org.jitsi.rest.isEnabled import org.jitsi.rest.servletContextHandler +import org.jitsi.rtp.Packet +import org.jitsi.rtp.rtp.RtpPacket import org.jitsi.shutdown.ShutdownServiceImpl import org.jitsi.utils.logging2.LoggerImpl import org.jitsi.utils.queue.PacketQueue @@ -239,4 +242,13 @@ private fun setupBufferPools() { org.jitsi.rtp.util.BufferPool.returnArray = { ByteBufferPool.returnBuffer(it) } org.jitsi.nlj.util.BufferPool.getBuffer = { ByteBufferPool.getBuffer(it) } org.jitsi.nlj.util.BufferPool.returnBuffer = { ByteBufferPool.returnBuffer(it) } + org.ice4j.util.BufferPool.getBuffer = { len -> + val b = ByteBufferPool.getBuffer(len) + Buffer(b, 0, b.size) + } + org.ice4j.util.BufferPool.returnBuffer = { ByteBufferPool.returnBuffer(it.buffer) } + org.ice4j.ice.harvest.AbstractUdpListener.BYTES_TO_LEAVE_AT_START_OF_PACKET = + RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET + org.ice4j.ice.harvest.AbstractUdpListener.BYTES_TO_LEAVE_AT_END_OF_PACKET = + Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET } diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt index 2199ac57f6..7c3c37ce45 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt @@ -15,6 +15,7 @@ */ package org.jitsi.videobridge.relay +import org.ice4j.util.Buffer import org.jitsi.dcsctp4j.DcSctpMessage import org.jitsi.dcsctp4j.ErrorKind import org.jitsi.dcsctp4j.SendPacketStatus @@ -47,7 +48,6 @@ import org.jitsi.nlj.util.LocalSsrcAssociation import org.jitsi.nlj.util.PacketInfoQueue import org.jitsi.nlj.util.RemoteSsrcAssociation import org.jitsi.nlj.util.sumOf -import org.jitsi.rtp.Packet import org.jitsi.rtp.UnparsedPacket import org.jitsi.rtp.extensions.looksLikeRtcp import org.jitsi.rtp.extensions.looksLikeRtp @@ -365,26 +365,21 @@ class Relay @JvmOverloads constructor( private fun setupIceTransport() { iceTransport.incomingDataHandler = object : IceTransport.IncomingDataHandler { - override fun dataReceived(data: ByteArray, offset: Int, length: Int, receivedTime: Instant) { + override fun dataReceived(buffer: Buffer) { // DTLS data will be handled by the DtlsTransport, but SRTP data can go // straight to the transceiver - if (looksLikeDtls(data, offset, length)) { + if (looksLikeDtls(buffer.buffer, buffer.offset, buffer.length)) { // DTLS transport is responsible for making its own copy, because it will manage its own // buffers - dtlsTransport.dtlsDataReceived(data, offset, length) + // TODO: place on a queue, we can't risk blocking the ice4j thread. + dtlsTransport.dtlsDataReceived(buffer.buffer, buffer.offset, buffer.length) } else { - val copy = ByteBufferPool.getBuffer( - length + - RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET + - Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET - ) - System.arraycopy(data, offset, copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length) val pktInfo = RelayedPacketInfo( - UnparsedPacket(copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length), + UnparsedPacket(buffer.buffer, buffer.offset, buffer.length), meshId ).apply { - this.receivedTime = receivedTime + this.receivedTime = buffer.receivedTime } handleMediaPacket(pktInfo) } @@ -399,7 +394,6 @@ class Relay @JvmOverloads constructor( outgoingSrtpPacketQueue.add(packetInfo) } }) - TaskPools.IO_POOL.execute(iceTransport::startReadingData) TaskPools.IO_POOL.execute(dtlsTransport::startDtlsHandshake) } diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/transport/ice/IceTransport.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/transport/ice/IceTransport.kt index 9f7ae36245..56b5fda1c2 100755 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/transport/ice/IceTransport.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/transport/ice/IceTransport.kt @@ -27,7 +27,10 @@ import org.ice4j.ice.IceProcessingState import org.ice4j.ice.LocalCandidate import org.ice4j.ice.RemoteCandidate import org.ice4j.ice.harvest.MappingCandidateHarvesters -import org.ice4j.socket.SocketClosedException +import org.ice4j.util.Buffer +import org.ice4j.util.BufferHandler +import org.jitsi.rtp.Packet +import org.jitsi.rtp.rtp.RtpPacket import org.jitsi.utils.OrderedJsonObject import org.jitsi.utils.logging2.Logger import org.jitsi.utils.logging2.cdebug @@ -36,6 +39,8 @@ import org.jitsi.videobridge.ice.Harvesters import org.jitsi.videobridge.ice.IceConfig import org.jitsi.videobridge.ice.TransportUtils import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer +import org.jitsi.videobridge.util.ByteBufferPool +import org.jitsi.videobridge.util.TaskPools import org.jitsi.xmpp.extensions.jingle.CandidatePacketExtension import org.jitsi.xmpp.extensions.jingle.IceCandidatePacketExtension import org.jitsi.xmpp.extensions.jingle.IceRtcpmuxPacketExtension @@ -62,7 +67,7 @@ class IceTransport @JvmOverloads constructor( * Whether the ICE agent created by this transport should use * unique local ports, rather than the configured port. */ - useUniquePort: Boolean, + val useUniquePort: Boolean, /** * Use private addresses for this [IceTransport] even if [IceConfig.advertisePrivateCandidates] is false. */ @@ -73,14 +78,9 @@ class IceTransport @JvmOverloads constructor( private val logger = createChildLogger(parentLogger) /** - * The handler which will be invoked when data is received. The handler - * does *not* own the buffer passed to it, so a copy must be made if it wants - * to use the data after the handler call finishes. This field should be - * set by some other entity which wishes to handle the incoming data + * The handler which will be invoked when data is received. + * This field should be set by some other entity which wishes to handle the incoming data * received over the ICE connection. - * NOTE: we don't create a packet in [IceTransport] because - * RTP packets want space before and after and [IceTransport] - * has no notion of what kind of data is contained within the buffer. */ @JvmField var incomingDataHandler: IncomingDataHandler? = null @@ -135,7 +135,16 @@ class IceTransport @JvmOverloads constructor( addPairChangeListener(iceStreamPairChangedListener) } - private val iceComponent = iceAgent.createComponent(iceStream, IceConfig.config.keepAliveStrategy, true) + private val iceComponent = iceAgent.createComponent(iceStream, IceConfig.config.keepAliveStrategy, false).apply { + setBufferCallback(object : BufferHandler { + override fun handleBuffer(buffer: Buffer) { + incomingDataHandler?.dataReceived(buffer) ?: run { + packetStats.numIncomingPacketsDroppedNoHandler.increment() + ByteBufferPool.returnBuffer(buffer.buffer) + } + } + }) + } private val packetStats = PacketStats() val icePassword: String get() = iceAgent.localPassword @@ -204,7 +213,7 @@ class IceTransport @JvmOverloads constructor( fun startReadingData() { logger.cdebug { "Starting to read incoming data" } - val socket = iceComponent.socket + val socket = iceComponent.selectedPair.iceSocketWrapper val receiveBuf = ByteArray(1500) val packet = DatagramPacket(receiveBuf, 0, receiveBuf.size) var receivedTime: Instant @@ -213,16 +222,25 @@ class IceTransport @JvmOverloads constructor( try { socket.receive(packet) receivedTime = clock.instant() - } catch (e: SocketClosedException) { - logger.info("Socket closed, stopping reader") - break } catch (e: IOException) { logger.warn("Stopping reader", e) break } packetStats.numPacketsReceived.increment() try { - incomingDataHandler?.dataReceived(receiveBuf, packet.offset, packet.length, receivedTime) ?: run { + val b = ByteBufferPool.getBuffer( + RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET + packet.length + Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET + ) + System.arraycopy( + packet.data, + packet.offset, + b, + RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, + packet.length + ) + val buffer = Buffer(b, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, packet.length, receivedTime) + + incomingDataHandler?.dataReceived(buffer) ?: run { logger.cdebug { "Data handler is null, dropping data" } packetStats.numIncomingPacketsDroppedNoHandler.increment() } @@ -239,7 +257,7 @@ class IceTransport @JvmOverloads constructor( fun send(data: ByteArray, off: Int, length: Int) { if (running.get()) { try { - iceComponent.socket.send(DatagramPacket(data, off, length)) + iceComponent.send(data, off, length) packetStats.numPacketsSent.increment() } catch (e: IOException) { logger.error("Error sending packet", e) @@ -344,6 +362,13 @@ class IceTransport @JvmOverloads constructor( transition.completed() -> { if (iceConnected.compareAndSet(false, true)) { eventHandler?.connected() + if (useUniquePort) { + // ice4j's push API only works with the single port harvester. With unique ports we still need + // to read from the socket. + TaskPools.IO_POOL.submit { + startReadingData() + } + } if (iceComponent.selectedPair.remoteCandidate.type == CandidateType.RELAYED_CANDIDATE || iceComponent.selectedPair.localCandidate.type == CandidateType.RELAYED_CANDIDATE ) { @@ -435,7 +460,7 @@ class IceTransport @JvmOverloads constructor( * Notify the handler that data was received (contained * within [data] at [offset] with [length]) at [receivedTime] */ - fun dataReceived(data: ByteArray, offset: Int, length: Int, receivedTime: Instant) + fun dataReceived(buffer: Buffer) } interface EventHandler { diff --git a/jvb/src/main/resources/application.conf b/jvb/src/main/resources/application.conf index 09d391d298..4c3facbfc2 100644 --- a/jvb/src/main/resources/application.conf +++ b/jvb/src/main/resources/application.conf @@ -1,5 +1,6 @@ # This file contains overrides for libraries JVB uses ice4j { + use-push-api = true consent-freshness { // Sends "consent freshness" check every 5 seconds. interval = 5 seconds From 6bf3c052ad15cc865114d09c7a003dcaae8bcaa7 Mon Sep 17 00:00:00 2001 From: Boris Grozev Date: Thu, 8 Aug 2024 17:40:49 -0500 Subject: [PATCH 2/5] feat: Put DTLS packets on a queue. --- .../kotlin/org/jitsi/videobridge/Endpoint.kt | 5 ++- .../org/jitsi/videobridge/relay/Relay.kt | 8 ++--- .../transport/dtls/DtlsTransport.kt | 31 +++++++++++++++++-- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt index f0a45ccaee..70ee9593ff 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt @@ -165,7 +165,7 @@ class Endpoint @JvmOverloads constructor( /* TODO: do we ever want to support useUniquePort for an Endpoint? */ private val iceTransport = IceTransport(id, iceControlling, false, supportsPrivateAddresses, logger) - private val dtlsTransport = DtlsTransport(logger).also { it.cryptex = CryptexConfig.endpoint } + private val dtlsTransport = DtlsTransport(logger, id).also { it.cryptex = CryptexConfig.endpoint } private var cryptex: Boolean = CryptexConfig.endpoint @@ -409,8 +409,7 @@ class Endpoint @JvmOverloads constructor( override fun dataReceived(buffer: Buffer) { if (looksLikeDtls(buffer.buffer, buffer.offset, buffer.length)) { // DTLS transport is responsible for making its own copy, because it will manage its own buffers - // TODO: place on a queue, we can't risk blocking the ice4j thread. - dtlsTransport.dtlsDataReceived(buffer.buffer, buffer.offset, buffer.length) + dtlsTransport.enqueueBuffer(buffer) } else { val pktInfo = PacketInfo(UnparsedPacket(buffer.buffer, buffer.offset, buffer.length)).apply { diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt index 7c3c37ce45..2a5095f279 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt @@ -220,7 +220,7 @@ class Relay @JvmOverloads constructor( clock = clock ) - private val dtlsTransport = DtlsTransport(logger).also { it.cryptex = CryptexConfig.relay } + private val dtlsTransport = DtlsTransport(logger, id).also { it.cryptex = CryptexConfig.relay } private var cryptex = CryptexConfig.relay @@ -369,10 +369,8 @@ class Relay @JvmOverloads constructor( // DTLS data will be handled by the DtlsTransport, but SRTP data can go // straight to the transceiver if (looksLikeDtls(buffer.buffer, buffer.offset, buffer.length)) { - // DTLS transport is responsible for making its own copy, because it will manage its own - // buffers - // TODO: place on a queue, we can't risk blocking the ice4j thread. - dtlsTransport.dtlsDataReceived(buffer.buffer, buffer.offset, buffer.length) + // DTLS transport is responsible for making its own copy, because it will manage its own buffers + dtlsTransport.enqueueBuffer(buffer) } else { val pktInfo = RelayedPacketInfo( diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/transport/dtls/DtlsTransport.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/transport/dtls/DtlsTransport.kt index ed9c2276a0..e6db0a8293 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/transport/dtls/DtlsTransport.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/transport/dtls/DtlsTransport.kt @@ -16,13 +16,17 @@ package org.jitsi.videobridge.transport.dtls +import org.ice4j.util.Buffer import org.jitsi.nlj.dtls.DtlsClient import org.jitsi.nlj.dtls.DtlsServer import org.jitsi.nlj.dtls.DtlsStack import org.jitsi.nlj.srtp.TlsRole +import org.jitsi.nlj.util.BufferPool import org.jitsi.utils.OrderedJsonObject import org.jitsi.utils.logging2.Logger import org.jitsi.utils.logging2.createChildLogger +import org.jitsi.utils.queue.PacketQueue +import org.jitsi.videobridge.util.TaskPools import org.jitsi.xmpp.extensions.jingle.DtlsFingerprintPacketExtension import org.jitsi.xmpp.extensions.jingle.IceUdpTransportPacketExtension import java.util.concurrent.atomic.AtomicBoolean @@ -39,7 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean * be passed to the [outgoingDataHandler], which should be set by an * interested party. */ -class DtlsTransport(parentLogger: Logger) { +class DtlsTransport(parentLogger: Logger, id: String) { private val logger = createChildLogger(parentLogger) private val running = AtomicBoolean(true) @@ -62,6 +66,26 @@ class DtlsTransport(parentLogger: Logger) { /** Whether to advertise cryptex to peers. */ var cryptex = false + val dtlsQueue = object : PacketQueue( + 128, + null, + "dtls-queue-$id", + { buffer: Buffer -> + try { + dtlsDataReceived(buffer.buffer, buffer.offset, buffer.length) + true + } catch (e: Exception) { + logger.warn("Failed to handle DTLS data", e) + false + } + }, + TaskPools.IO_POOL, + ) { + override fun releasePacket(buffer: Buffer) { + BufferPool.returnBuffer(buffer.buffer) + } + } + /** * The DTLS stack instance */ @@ -170,10 +194,13 @@ class DtlsTransport(parentLogger: Logger) { } } + fun enqueueBuffer(buffer: Buffer) = dtlsQueue.add(buffer) + /** * Notify this layer that DTLS data has been received from the network */ - fun dtlsDataReceived(data: ByteArray, off: Int, len: Int) = dtlsStack.processIncomingProtocolData(data, off, len) + private fun dtlsDataReceived(data: ByteArray, off: Int, len: Int) = + dtlsStack.processIncomingProtocolData(data, off, len) /** * Send out DTLS data From 8ec0b6234c3be0a928475f0c9dcc82111683b4a9 Mon Sep 17 00:00:00 2001 From: Boris Grozev Date: Wed, 18 Sep 2024 16:29:42 -0500 Subject: [PATCH 3/5] squash: Configure via AbstractUdpListener. --- jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt | 2 ++ jvb/src/main/resources/application.conf | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt index a218fdb2ca..07b4e4d668 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt @@ -18,6 +18,7 @@ package org.jitsi.videobridge import org.eclipse.jetty.servlet.ServletHolder import org.glassfish.jersey.servlet.ServletContainer +import org.ice4j.ice.harvest.AbstractUdpListener import org.ice4j.ice.harvest.MappingCandidateHarvesters import org.ice4j.util.Buffer import org.jitsi.config.JitsiConfig @@ -223,6 +224,7 @@ private fun getSystemPropertyDefaults(): Map { } private fun startIce4j() { + AbstractUdpListener.USE_PUSH_API = true // Start the initialization of the mapping candidate harvesters. // Asynchronous, because the AWS and STUN harvester may take a long // time to initialize. diff --git a/jvb/src/main/resources/application.conf b/jvb/src/main/resources/application.conf index 4c3facbfc2..09d391d298 100644 --- a/jvb/src/main/resources/application.conf +++ b/jvb/src/main/resources/application.conf @@ -1,6 +1,5 @@ # This file contains overrides for libraries JVB uses ice4j { - use-push-api = true consent-freshness { // Sends "consent freshness" check every 5 seconds. interval = 5 seconds From 2b3f0786bde9356e2b3ab6421fef7e23db261ec7 Mon Sep 17 00:00:00 2001 From: Jonathan Lennox Date: Tue, 1 Oct 2024 17:18:19 -0400 Subject: [PATCH 4/5] Start writing to the ICE transport as soon as we have any validated pair. --- .../kotlin/org/jitsi/videobridge/Endpoint.kt | 5 ++++- .../org/jitsi/videobridge/relay/Relay.kt | 4 +++- .../videobridge/transport/ice/IceTransport.kt | 21 ++++++++++++++++++- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt index 70ee9593ff..b6a659a72b 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt @@ -420,7 +420,7 @@ class Endpoint @JvmOverloads constructor( } } iceTransport.eventHandler = object : IceTransport.EventHandler { - override fun connected() { + override fun writeable() { logger.info("ICE connected") transceiver.setOutgoingPacketHandler(object : PacketHandler { override fun processPacket(packetInfo: PacketInfo) { @@ -431,6 +431,9 @@ class Endpoint @JvmOverloads constructor( TaskPools.IO_POOL.execute(dtlsTransport::startDtlsHandshake) } + override fun connected() { + } + override fun failed() { } diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt index 2a5095f279..af848eaf05 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt @@ -384,7 +384,7 @@ class Relay @JvmOverloads constructor( } } iceTransport.eventHandler = object : IceTransport.EventHandler { - override fun connected() { + override fun writeable() { logger.info("ICE connected") transceiver.setOutgoingPacketHandler(object : PacketHandler { override fun processPacket(packetInfo: PacketInfo) { @@ -395,6 +395,8 @@ class Relay @JvmOverloads constructor( TaskPools.IO_POOL.execute(dtlsTransport::startDtlsHandshake) } + override fun connected() {} + override fun failed() {} override fun consentUpdated(time: Instant) { diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/transport/ice/IceTransport.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/transport/ice/IceTransport.kt index 56b5fda1c2..b33681dce2 100755 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/transport/ice/IceTransport.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/transport/ice/IceTransport.kt @@ -94,6 +94,13 @@ class IceTransport @JvmOverloads constructor( @JvmField var eventHandler: EventHandler? = null + /** + * Whether or not it is possible to write to this [IceTransport]. + * + * This happens as soon as any candidate pair is validated, and happens (usually) before iceConnected. + */ + private val iceWriteable = AtomicBoolean(false) + /** * Whether or not this [IceTransport] has connected. */ @@ -106,6 +113,8 @@ class IceTransport @JvmOverloads constructor( fun hasFailed(): Boolean = iceFailed.get() + fun isWriteable(): Boolean = iceWriteable.get() + fun isConnected(): Boolean = iceConnected.get() /** @@ -282,6 +291,7 @@ class IceTransport @JvmOverloads constructor( put("nominationStrategy", IceConfig.config.nominationStrategy.toString()) put("advertisePrivateCandidates", IceConfig.config.advertisePrivateCandidates) put("closed", !running.get()) + put("iceWriteable", iceWriteable.get()) put("iceConnected", iceConnected.get()) put("iceFailed", iceFailed.get()) putAll(packetStats.toJson()) @@ -400,7 +410,11 @@ class IceTransport @JvmOverloads constructor( } private fun iceStreamPairChanged(ev: PropertyChangeEvent) { - if (IceMediaStream.PROPERTY_PAIR_CONSENT_FRESHNESS_CHANGED == ev.propertyName) { + if (IceMediaStream.PROPERTY_PAIR_VALIDATED == ev.propertyName) { + if (iceWriteable.compareAndSet(false, true)) { + eventHandler?.writeable() + } + } else if (IceMediaStream.PROPERTY_PAIR_CONSENT_FRESHNESS_CHANGED == ev.propertyName) { /* TODO: Currently ice4j only triggers this event for the selected * pair, but should we double-check the pair anyway? */ @@ -464,6 +478,11 @@ class IceTransport @JvmOverloads constructor( } interface EventHandler { + /** + * Notify the event handler that it is possible to write to the ICE stack + */ + fun writeable() + /** * Notify the event handler that ICE connected successfully */ From 8f0be2edaabf8666863f6136030d446aa945b271 Mon Sep 17 00:00:00 2001 From: Boris Grozev Date: Wed, 2 Oct 2024 12:59:21 -0500 Subject: [PATCH 5/5] chore: Bump to ice4j 3.2 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3a00efafcb..814911e8a9 100644 --- a/pom.xml +++ b/pom.xml @@ -106,7 +106,7 @@ ${project.groupId} ice4j - 3.0-72-g824cd4b + 3.2-0-g63cddcd ${project.groupId}