diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt index 1b6a3c7bab..b6a659a72b 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt @@ -16,6 +16,7 @@ package org.jitsi.videobridge +import org.ice4j.util.Buffer import org.jitsi.config.JitsiConfig import org.jitsi.dcsctp4j.DcSctpMessage import org.jitsi.dcsctp4j.ErrorKind @@ -45,13 +46,11 @@ import org.jitsi.nlj.util.NEVER import org.jitsi.nlj.util.PacketInfoQueue import org.jitsi.nlj.util.RemoteSsrcAssociation import org.jitsi.nlj.util.sumOf -import org.jitsi.rtp.Packet import org.jitsi.rtp.UnparsedPacket import org.jitsi.rtp.rtcp.RtcpSrPacket import org.jitsi.rtp.rtcp.rtcpfb.RtcpFbPacket import org.jitsi.rtp.rtcp.rtcpfb.payload_specific_fb.RtcpFbFirPacket import org.jitsi.rtp.rtcp.rtcpfb.payload_specific_fb.RtcpFbPliPacket -import org.jitsi.rtp.rtp.RtpPacket import org.jitsi.utils.MediaType import org.jitsi.utils.concurrent.RecurringRunnableExecutor import org.jitsi.utils.logging2.Logger @@ -166,7 +165,7 @@ class Endpoint @JvmOverloads constructor( /* TODO: do we ever want to support useUniquePort for an Endpoint? */ private val iceTransport = IceTransport(id, iceControlling, false, supportsPrivateAddresses, logger) - private val dtlsTransport = DtlsTransport(logger).also { it.cryptex = CryptexConfig.endpoint } + private val dtlsTransport = DtlsTransport(logger, id).also { it.cryptex = CryptexConfig.endpoint } private var cryptex: Boolean = CryptexConfig.endpoint @@ -407,30 +406,21 @@ class Endpoint @JvmOverloads constructor( private fun setupIceTransport() { iceTransport.incomingDataHandler = object : IceTransport.IncomingDataHandler { - override fun dataReceived(data: ByteArray, offset: Int, length: Int, receivedTime: Instant) { - // DTLS data will be handled by the DtlsTransport, but SRTP data can go - // straight to the transceiver - if (looksLikeDtls(data, offset, length)) { - // DTLS transport is responsible for making its own copy, because it will manage its own - // buffers - dtlsTransport.dtlsDataReceived(data, offset, length) + override fun dataReceived(buffer: Buffer) { + if (looksLikeDtls(buffer.buffer, buffer.offset, buffer.length)) { + // DTLS transport is responsible for making its own copy, because it will manage its own buffers + dtlsTransport.enqueueBuffer(buffer) } else { - val copy = ByteBufferPool.getBuffer( - length + - RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET + - Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET - ) - System.arraycopy(data, offset, copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length) val pktInfo = - PacketInfo(UnparsedPacket(copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length)).apply { - this.receivedTime = receivedTime + PacketInfo(UnparsedPacket(buffer.buffer, buffer.offset, buffer.length)).apply { + this.receivedTime = buffer.receivedTime } transceiver.handleIncomingPacket(pktInfo) } } } iceTransport.eventHandler = object : IceTransport.EventHandler { - override fun connected() { + override fun writeable() { logger.info("ICE connected") transceiver.setOutgoingPacketHandler(object : PacketHandler { override fun processPacket(packetInfo: PacketInfo) { @@ -438,10 +428,12 @@ class Endpoint @JvmOverloads constructor( outgoingSrtpPacketQueue.add(packetInfo) } }) - TaskPools.IO_POOL.execute(iceTransport::startReadingData) TaskPools.IO_POOL.execute(dtlsTransport::startDtlsHandshake) } + override fun connected() { + } + override fun failed() { } diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt index ec623884ae..07b4e4d668 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/Main.kt @@ -18,7 +18,9 @@ package org.jitsi.videobridge import org.eclipse.jetty.servlet.ServletHolder import org.glassfish.jersey.servlet.ServletContainer +import org.ice4j.ice.harvest.AbstractUdpListener import org.ice4j.ice.harvest.MappingCandidateHarvesters +import org.ice4j.util.Buffer import org.jitsi.config.JitsiConfig import org.jitsi.metaconfig.ConfigException import org.jitsi.metaconfig.MetaconfigLogger @@ -29,6 +31,8 @@ import org.jitsi.rest.createServer import org.jitsi.rest.enableCors import org.jitsi.rest.isEnabled import org.jitsi.rest.servletContextHandler +import org.jitsi.rtp.Packet +import org.jitsi.rtp.rtp.RtpPacket import org.jitsi.shutdown.ShutdownServiceImpl import org.jitsi.utils.logging2.LoggerImpl import org.jitsi.utils.queue.PacketQueue @@ -220,6 +224,7 @@ private fun getSystemPropertyDefaults(): Map { } private fun startIce4j() { + AbstractUdpListener.USE_PUSH_API = true // Start the initialization of the mapping candidate harvesters. // Asynchronous, because the AWS and STUN harvester may take a long // time to initialize. @@ -239,4 +244,13 @@ private fun setupBufferPools() { org.jitsi.rtp.util.BufferPool.returnArray = { ByteBufferPool.returnBuffer(it) } org.jitsi.nlj.util.BufferPool.getBuffer = { ByteBufferPool.getBuffer(it) } org.jitsi.nlj.util.BufferPool.returnBuffer = { ByteBufferPool.returnBuffer(it) } + org.ice4j.util.BufferPool.getBuffer = { len -> + val b = ByteBufferPool.getBuffer(len) + Buffer(b, 0, b.size) + } + org.ice4j.util.BufferPool.returnBuffer = { ByteBufferPool.returnBuffer(it.buffer) } + org.ice4j.ice.harvest.AbstractUdpListener.BYTES_TO_LEAVE_AT_START_OF_PACKET = + RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET + org.ice4j.ice.harvest.AbstractUdpListener.BYTES_TO_LEAVE_AT_END_OF_PACKET = + Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET } diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt index 2199ac57f6..af848eaf05 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/relay/Relay.kt @@ -15,6 +15,7 @@ */ package org.jitsi.videobridge.relay +import org.ice4j.util.Buffer import org.jitsi.dcsctp4j.DcSctpMessage import org.jitsi.dcsctp4j.ErrorKind import org.jitsi.dcsctp4j.SendPacketStatus @@ -47,7 +48,6 @@ import org.jitsi.nlj.util.LocalSsrcAssociation import org.jitsi.nlj.util.PacketInfoQueue import org.jitsi.nlj.util.RemoteSsrcAssociation import org.jitsi.nlj.util.sumOf -import org.jitsi.rtp.Packet import org.jitsi.rtp.UnparsedPacket import org.jitsi.rtp.extensions.looksLikeRtcp import org.jitsi.rtp.extensions.looksLikeRtp @@ -220,7 +220,7 @@ class Relay @JvmOverloads constructor( clock = clock ) - private val dtlsTransport = DtlsTransport(logger).also { it.cryptex = CryptexConfig.relay } + private val dtlsTransport = DtlsTransport(logger, id).also { it.cryptex = CryptexConfig.relay } private var cryptex = CryptexConfig.relay @@ -365,33 +365,26 @@ class Relay @JvmOverloads constructor( private fun setupIceTransport() { iceTransport.incomingDataHandler = object : IceTransport.IncomingDataHandler { - override fun dataReceived(data: ByteArray, offset: Int, length: Int, receivedTime: Instant) { + override fun dataReceived(buffer: Buffer) { // DTLS data will be handled by the DtlsTransport, but SRTP data can go // straight to the transceiver - if (looksLikeDtls(data, offset, length)) { - // DTLS transport is responsible for making its own copy, because it will manage its own - // buffers - dtlsTransport.dtlsDataReceived(data, offset, length) + if (looksLikeDtls(buffer.buffer, buffer.offset, buffer.length)) { + // DTLS transport is responsible for making its own copy, because it will manage its own buffers + dtlsTransport.enqueueBuffer(buffer) } else { - val copy = ByteBufferPool.getBuffer( - length + - RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET + - Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET - ) - System.arraycopy(data, offset, copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length) val pktInfo = RelayedPacketInfo( - UnparsedPacket(copy, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, length), + UnparsedPacket(buffer.buffer, buffer.offset, buffer.length), meshId ).apply { - this.receivedTime = receivedTime + this.receivedTime = buffer.receivedTime } handleMediaPacket(pktInfo) } } } iceTransport.eventHandler = object : IceTransport.EventHandler { - override fun connected() { + override fun writeable() { logger.info("ICE connected") transceiver.setOutgoingPacketHandler(object : PacketHandler { override fun processPacket(packetInfo: PacketInfo) { @@ -399,10 +392,11 @@ class Relay @JvmOverloads constructor( outgoingSrtpPacketQueue.add(packetInfo) } }) - TaskPools.IO_POOL.execute(iceTransport::startReadingData) TaskPools.IO_POOL.execute(dtlsTransport::startDtlsHandshake) } + override fun connected() {} + override fun failed() {} override fun consentUpdated(time: Instant) { diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/transport/dtls/DtlsTransport.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/transport/dtls/DtlsTransport.kt index ed9c2276a0..e6db0a8293 100644 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/transport/dtls/DtlsTransport.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/transport/dtls/DtlsTransport.kt @@ -16,13 +16,17 @@ package org.jitsi.videobridge.transport.dtls +import org.ice4j.util.Buffer import org.jitsi.nlj.dtls.DtlsClient import org.jitsi.nlj.dtls.DtlsServer import org.jitsi.nlj.dtls.DtlsStack import org.jitsi.nlj.srtp.TlsRole +import org.jitsi.nlj.util.BufferPool import org.jitsi.utils.OrderedJsonObject import org.jitsi.utils.logging2.Logger import org.jitsi.utils.logging2.createChildLogger +import org.jitsi.utils.queue.PacketQueue +import org.jitsi.videobridge.util.TaskPools import org.jitsi.xmpp.extensions.jingle.DtlsFingerprintPacketExtension import org.jitsi.xmpp.extensions.jingle.IceUdpTransportPacketExtension import java.util.concurrent.atomic.AtomicBoolean @@ -39,7 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean * be passed to the [outgoingDataHandler], which should be set by an * interested party. */ -class DtlsTransport(parentLogger: Logger) { +class DtlsTransport(parentLogger: Logger, id: String) { private val logger = createChildLogger(parentLogger) private val running = AtomicBoolean(true) @@ -62,6 +66,26 @@ class DtlsTransport(parentLogger: Logger) { /** Whether to advertise cryptex to peers. */ var cryptex = false + val dtlsQueue = object : PacketQueue( + 128, + null, + "dtls-queue-$id", + { buffer: Buffer -> + try { + dtlsDataReceived(buffer.buffer, buffer.offset, buffer.length) + true + } catch (e: Exception) { + logger.warn("Failed to handle DTLS data", e) + false + } + }, + TaskPools.IO_POOL, + ) { + override fun releasePacket(buffer: Buffer) { + BufferPool.returnBuffer(buffer.buffer) + } + } + /** * The DTLS stack instance */ @@ -170,10 +194,13 @@ class DtlsTransport(parentLogger: Logger) { } } + fun enqueueBuffer(buffer: Buffer) = dtlsQueue.add(buffer) + /** * Notify this layer that DTLS data has been received from the network */ - fun dtlsDataReceived(data: ByteArray, off: Int, len: Int) = dtlsStack.processIncomingProtocolData(data, off, len) + private fun dtlsDataReceived(data: ByteArray, off: Int, len: Int) = + dtlsStack.processIncomingProtocolData(data, off, len) /** * Send out DTLS data diff --git a/jvb/src/main/kotlin/org/jitsi/videobridge/transport/ice/IceTransport.kt b/jvb/src/main/kotlin/org/jitsi/videobridge/transport/ice/IceTransport.kt index 9f7ae36245..b33681dce2 100755 --- a/jvb/src/main/kotlin/org/jitsi/videobridge/transport/ice/IceTransport.kt +++ b/jvb/src/main/kotlin/org/jitsi/videobridge/transport/ice/IceTransport.kt @@ -27,7 +27,10 @@ import org.ice4j.ice.IceProcessingState import org.ice4j.ice.LocalCandidate import org.ice4j.ice.RemoteCandidate import org.ice4j.ice.harvest.MappingCandidateHarvesters -import org.ice4j.socket.SocketClosedException +import org.ice4j.util.Buffer +import org.ice4j.util.BufferHandler +import org.jitsi.rtp.Packet +import org.jitsi.rtp.rtp.RtpPacket import org.jitsi.utils.OrderedJsonObject import org.jitsi.utils.logging2.Logger import org.jitsi.utils.logging2.cdebug @@ -36,6 +39,8 @@ import org.jitsi.videobridge.ice.Harvesters import org.jitsi.videobridge.ice.IceConfig import org.jitsi.videobridge.ice.TransportUtils import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer +import org.jitsi.videobridge.util.ByteBufferPool +import org.jitsi.videobridge.util.TaskPools import org.jitsi.xmpp.extensions.jingle.CandidatePacketExtension import org.jitsi.xmpp.extensions.jingle.IceCandidatePacketExtension import org.jitsi.xmpp.extensions.jingle.IceRtcpmuxPacketExtension @@ -62,7 +67,7 @@ class IceTransport @JvmOverloads constructor( * Whether the ICE agent created by this transport should use * unique local ports, rather than the configured port. */ - useUniquePort: Boolean, + val useUniquePort: Boolean, /** * Use private addresses for this [IceTransport] even if [IceConfig.advertisePrivateCandidates] is false. */ @@ -73,14 +78,9 @@ class IceTransport @JvmOverloads constructor( private val logger = createChildLogger(parentLogger) /** - * The handler which will be invoked when data is received. The handler - * does *not* own the buffer passed to it, so a copy must be made if it wants - * to use the data after the handler call finishes. This field should be - * set by some other entity which wishes to handle the incoming data + * The handler which will be invoked when data is received. + * This field should be set by some other entity which wishes to handle the incoming data * received over the ICE connection. - * NOTE: we don't create a packet in [IceTransport] because - * RTP packets want space before and after and [IceTransport] - * has no notion of what kind of data is contained within the buffer. */ @JvmField var incomingDataHandler: IncomingDataHandler? = null @@ -94,6 +94,13 @@ class IceTransport @JvmOverloads constructor( @JvmField var eventHandler: EventHandler? = null + /** + * Whether or not it is possible to write to this [IceTransport]. + * + * This happens as soon as any candidate pair is validated, and happens (usually) before iceConnected. + */ + private val iceWriteable = AtomicBoolean(false) + /** * Whether or not this [IceTransport] has connected. */ @@ -106,6 +113,8 @@ class IceTransport @JvmOverloads constructor( fun hasFailed(): Boolean = iceFailed.get() + fun isWriteable(): Boolean = iceWriteable.get() + fun isConnected(): Boolean = iceConnected.get() /** @@ -135,7 +144,16 @@ class IceTransport @JvmOverloads constructor( addPairChangeListener(iceStreamPairChangedListener) } - private val iceComponent = iceAgent.createComponent(iceStream, IceConfig.config.keepAliveStrategy, true) + private val iceComponent = iceAgent.createComponent(iceStream, IceConfig.config.keepAliveStrategy, false).apply { + setBufferCallback(object : BufferHandler { + override fun handleBuffer(buffer: Buffer) { + incomingDataHandler?.dataReceived(buffer) ?: run { + packetStats.numIncomingPacketsDroppedNoHandler.increment() + ByteBufferPool.returnBuffer(buffer.buffer) + } + } + }) + } private val packetStats = PacketStats() val icePassword: String get() = iceAgent.localPassword @@ -204,7 +222,7 @@ class IceTransport @JvmOverloads constructor( fun startReadingData() { logger.cdebug { "Starting to read incoming data" } - val socket = iceComponent.socket + val socket = iceComponent.selectedPair.iceSocketWrapper val receiveBuf = ByteArray(1500) val packet = DatagramPacket(receiveBuf, 0, receiveBuf.size) var receivedTime: Instant @@ -213,16 +231,25 @@ class IceTransport @JvmOverloads constructor( try { socket.receive(packet) receivedTime = clock.instant() - } catch (e: SocketClosedException) { - logger.info("Socket closed, stopping reader") - break } catch (e: IOException) { logger.warn("Stopping reader", e) break } packetStats.numPacketsReceived.increment() try { - incomingDataHandler?.dataReceived(receiveBuf, packet.offset, packet.length, receivedTime) ?: run { + val b = ByteBufferPool.getBuffer( + RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET + packet.length + Packet.BYTES_TO_LEAVE_AT_END_OF_PACKET + ) + System.arraycopy( + packet.data, + packet.offset, + b, + RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, + packet.length + ) + val buffer = Buffer(b, RtpPacket.BYTES_TO_LEAVE_AT_START_OF_PACKET, packet.length, receivedTime) + + incomingDataHandler?.dataReceived(buffer) ?: run { logger.cdebug { "Data handler is null, dropping data" } packetStats.numIncomingPacketsDroppedNoHandler.increment() } @@ -239,7 +266,7 @@ class IceTransport @JvmOverloads constructor( fun send(data: ByteArray, off: Int, length: Int) { if (running.get()) { try { - iceComponent.socket.send(DatagramPacket(data, off, length)) + iceComponent.send(data, off, length) packetStats.numPacketsSent.increment() } catch (e: IOException) { logger.error("Error sending packet", e) @@ -264,6 +291,7 @@ class IceTransport @JvmOverloads constructor( put("nominationStrategy", IceConfig.config.nominationStrategy.toString()) put("advertisePrivateCandidates", IceConfig.config.advertisePrivateCandidates) put("closed", !running.get()) + put("iceWriteable", iceWriteable.get()) put("iceConnected", iceConnected.get()) put("iceFailed", iceFailed.get()) putAll(packetStats.toJson()) @@ -344,6 +372,13 @@ class IceTransport @JvmOverloads constructor( transition.completed() -> { if (iceConnected.compareAndSet(false, true)) { eventHandler?.connected() + if (useUniquePort) { + // ice4j's push API only works with the single port harvester. With unique ports we still need + // to read from the socket. + TaskPools.IO_POOL.submit { + startReadingData() + } + } if (iceComponent.selectedPair.remoteCandidate.type == CandidateType.RELAYED_CANDIDATE || iceComponent.selectedPair.localCandidate.type == CandidateType.RELAYED_CANDIDATE ) { @@ -375,7 +410,11 @@ class IceTransport @JvmOverloads constructor( } private fun iceStreamPairChanged(ev: PropertyChangeEvent) { - if (IceMediaStream.PROPERTY_PAIR_CONSENT_FRESHNESS_CHANGED == ev.propertyName) { + if (IceMediaStream.PROPERTY_PAIR_VALIDATED == ev.propertyName) { + if (iceWriteable.compareAndSet(false, true)) { + eventHandler?.writeable() + } + } else if (IceMediaStream.PROPERTY_PAIR_CONSENT_FRESHNESS_CHANGED == ev.propertyName) { /* TODO: Currently ice4j only triggers this event for the selected * pair, but should we double-check the pair anyway? */ @@ -435,10 +474,15 @@ class IceTransport @JvmOverloads constructor( * Notify the handler that data was received (contained * within [data] at [offset] with [length]) at [receivedTime] */ - fun dataReceived(data: ByteArray, offset: Int, length: Int, receivedTime: Instant) + fun dataReceived(buffer: Buffer) } interface EventHandler { + /** + * Notify the event handler that it is possible to write to the ICE stack + */ + fun writeable() + /** * Notify the event handler that ICE connected successfully */ diff --git a/pom.xml b/pom.xml index 3a00efafcb..814911e8a9 100644 --- a/pom.xml +++ b/pom.xml @@ -106,7 +106,7 @@ ${project.groupId} ice4j - 3.0-72-g824cd4b + 3.2-0-g63cddcd ${project.groupId}