Skip to content

Commit

Permalink
Convert TCC packets to use Instant and Duration (#2219)
Browse files Browse the repository at this point in the history
Rather than integer microseconds.
  • Loading branch information
JonathanLennox authored Sep 17, 2024
1 parent f625842 commit d3f5db4
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.jitsi.nlj.util.DataSize
import org.jitsi.nlj.util.NEVER
import org.jitsi.nlj.util.Rfc3711IndexTracker
import org.jitsi.nlj.util.formatMilli
import org.jitsi.nlj.util.instantOfEpochMicro
import org.jitsi.rtp.rtcp.RtcpPacket
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.PacketReport
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.ReceivedPacketReport
Expand Down Expand Up @@ -126,7 +125,7 @@ class TransportCcEngine(

private fun tccReceived(tccPacket: RtcpFbTccPacket) {
val now = clock.instant()
var currArrivalTimestamp = instantOfEpochMicro(tccPacket.GetBaseTimeUs())
var currArrivalTimestamp = tccPacket.BaseTime()
if (remoteReferenceTime == NEVER) {
remoteReferenceTime = currArrivalTimestamp
localReferenceTime = now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.jitsi.nlj.util.NEVER
import org.jitsi.nlj.util.ReadOnlyStreamInformationStore
import org.jitsi.nlj.util.Rfc3711IndexTracker
import org.jitsi.nlj.util.bytes
import org.jitsi.nlj.util.toEpochMicro
import org.jitsi.rtp.rtcp.RtcpPacket
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacketBuilder
Expand Down Expand Up @@ -182,20 +181,19 @@ class TccGeneratorNode(
mediaSourceSsrc = mediaSsrc,
feedbackPacketSeqNum = currTccSeqNum++
)
currentTccPacket.SetBase(windowStartSeq, firstEntry.value.toEpochMicro())
currentTccPacket.SetBase(windowStartSeq, firstEntry.value)

var nextSequenceNumber = windowStartSeq
val feedbackBlockPackets = packetArrivalTimes.tailMap(windowStartSeq)
feedbackBlockPackets.forEach { (seq, timestamp) ->
val timestampUs = timestamp.toEpochMicro()
if (!currentTccPacket.AddReceivedPacket(seq, timestampUs)) {
if (!currentTccPacket.AddReceivedPacket(seq, timestamp)) {
tccPackets.add(currentTccPacket.build())
currentTccPacket = RtcpFbTccPacketBuilder(
mediaSourceSsrc = mediaSsrc,
feedbackPacketSeqNum = currTccSeqNum++
).apply {
SetBase(seq, timestampUs)
AddReceivedPacket(seq, timestampUs)
SetBase(seq, timestamp)
AddReceivedPacket(seq, timestamp)
}
}
nextSequenceNumber = seq + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,47 +29,6 @@ fun Instant.formatMilli(): String = TimeUtils.formatTimeAsFullMillis(this.epochS

fun Duration.formatMilli(): String = TimeUtils.formatTimeAsFullMillis(this.seconds, this.nano)

/**
* Converts this instant to the number of microseconds from the epoch
* of 1970-01-01T00:00:00Z.
*
* If this instant represents a point on the time-line too far in the future
* or past to fit in a [Long] microseconds, then an exception is thrown.
*
* If this instant has greater than microsecond precision, then the conversion
* will drop any excess precision information as though the amount in nanoseconds
* was subject to integer division by one thousand.
*
* @return the number of microseconds since the epoch of 1970-01-01T00:00:00Z
* @throws ArithmeticException if numeric overflow occurs
*/
fun Instant.toEpochMicro(): Long {
return if (this.epochSecond < 0 && this.nano > 0) {
val micros = Math.multiplyExact(this.epochSecond + 1, 1000_000L)
val adjustment: Long = (this.nano / 1000 - 1000_000).toLong()
Math.addExact(micros, adjustment)
} else {
val micros = Math.multiplyExact(this.epochSecond, 1000_000L)
Math.addExact(micros, (this.nano / 1000).toLong())
}
}

/**
* Obtains an instance of [Instant] using microseconds from the
* epoch of 1970-01-01T00:00:00Z.
* <p>
* The seconds and nanoseconds are extracted from the specified milliseconds.
*
* @param epochMicro the number of microseconds from 1970-01-01T00:00:00Z
* @return an instant, not null
* @throws DateTimeException if the instant exceeds the maximum or minimum instant
*/
fun instantOfEpochMicro(epochMicro: Long): Instant {
val secs = Math.floorDiv(epochMicro, 1000_000L)
val micros = Math.floorMod(epochMicro, 1000_000L)
return Instant.ofEpochSecond(secs, micros * 1000L)
}

fun <T> Iterable<T>.sumOf(selector: (T) -> Duration): Duration {
var sum: Duration = Duration.ZERO
for (element in this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.jitsi.nlj.resources.logging.StdoutLogger
import org.jitsi.nlj.rtp.bandwidthestimation.BandwidthEstimator
import org.jitsi.nlj.util.bytes
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacketBuilder
import org.jitsi.utils.instantOfEpochMicro
import org.jitsi.utils.time.FakeClock
import java.util.logging.Level

Expand Down Expand Up @@ -58,11 +59,11 @@ class TransportCcEngineTest : FunSpec() {
transportCcEngine.mediaPacketSent(4, 1300.bytes)

val tccPacket = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 0)) {
SetBase(1, 100)
AddReceivedPacket(1, 100)
AddReceivedPacket(2, 110)
AddReceivedPacket(3, 120)
AddReceivedPacket(4, 130)
SetBase(1, instantOfEpochMicro(100))
AddReceivedPacket(1, instantOfEpochMicro(100))
AddReceivedPacket(2, instantOfEpochMicro(110))
AddReceivedPacket(3, instantOfEpochMicro(120))
AddReceivedPacket(4, instantOfEpochMicro(130))
build()
}

Expand All @@ -81,15 +82,15 @@ class TransportCcEngineTest : FunSpec() {
transportCcEngine.mediaPacketSent(4, 1300.bytes)

val tccPacket = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 1)) {
SetBase(4, 130)
AddReceivedPacket(4, 130)
SetBase(4, instantOfEpochMicro(130))
AddReceivedPacket(4, instantOfEpochMicro(130))
build()
}
transportCcEngine.rtcpPacketReceived(tccPacket, clock.instant())

val tccPacket2 = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 2)) {
SetBase(4, 130)
AddReceivedPacket(4, 130)
SetBase(4, instantOfEpochMicro(130))
AddReceivedPacket(4, instantOfEpochMicro(130))
build()
}
transportCcEngine.rtcpPacketReceived(tccPacket2, clock.instant())
Expand All @@ -108,8 +109,8 @@ class TransportCcEngineTest : FunSpec() {
transportCcEngine.mediaPacketSent(5, 1300.bytes)

val tccPacket = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 1)) {
SetBase(4, 130)
AddReceivedPacket(5, 130)
SetBase(4, instantOfEpochMicro(130))
AddReceivedPacket(5, instantOfEpochMicro(130))
build()
}
transportCcEngine.rtcpPacketReceived(tccPacket, clock.instant())
Expand All @@ -118,8 +119,8 @@ class TransportCcEngineTest : FunSpec() {
lossListener.numLost shouldBe 1

val tccPacket2 = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 2)) {
SetBase(4, 130)
AddReceivedPacket(4, 130)
SetBase(4, instantOfEpochMicro(130))
AddReceivedPacket(4, instantOfEpochMicro(130))
build()
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<kotlin.version>2.0.0</kotlin.version>
<kotest.version>5.9.1</kotest.version>
<junit.version>5.10.2</junit.version>
<jitsi.utils.version>1.0-132-g83984af</jitsi.utils.version>
<jitsi.utils.version>1.0-133-g6af1020</jitsi.utils.version>
<jicoco.version>1.1-141-g30ec741</jicoco.version>
<mockk.version>1.13.11</mockk.version>
<ktlint-maven-plugin.version>3.2.0</ktlint-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companio
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kDeltaScaleFactor
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kMaxReportedPackets
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kMaxSizeBytes
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kTimeWrapPeriodUs
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kTimeWrapPeriod
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kTransportFeedbackHeaderSizeBytes
import org.jitsi.rtp.rtp.RtpSequenceNumber
import org.jitsi.rtp.rtp.toRtpSequenceNumber
Expand All @@ -39,8 +39,12 @@ import org.jitsi.rtp.util.RtpUtils
import org.jitsi.rtp.util.get3BytesAsInt
import org.jitsi.rtp.util.getByteAsInt
import org.jitsi.rtp.util.getShortAsInt
import org.jitsi.utils.micros
import org.jitsi.utils.times
import org.jitsi.utils.toEpochMicro
import org.jitsi.utils.toMicros
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.time.Instant

sealed class PacketReport(val seqNum: Int)

Expand All @@ -52,7 +56,7 @@ typealias DeltaSize = Int

class ReceivedPacketReport(seqNum: Int, val deltaTicks: Short) : PacketReport(seqNum) {
val deltaDuration: Duration
get() = Duration.of(deltaTicks * 250L, ChronoUnit.MICROS)
get() = deltaTicks.toInt() * kDeltaScaleFactor
}

/**
Expand Down Expand Up @@ -95,23 +99,25 @@ class RtcpFbTccPacketBuilder(

// The size of the entire packet, in bytes
private var size_bytes_ = kTransportFeedbackHeaderSizeBytes
private var last_timestamp_us_: Long = 0
private var last_timestamp_: Instant = Instant.EPOCH
private val packets_ = mutableListOf<PacketReport>()

fun SetBase(base_sequence: Int, ref_timestamp_us: Long) {
fun SetBase(base_sequence: Int, ref_timestamp: Instant) {
base_seq_no_ = base_sequence.toRtpSequenceNumber()
base_time_ticks_ = (ref_timestamp_us % kTimeWrapPeriodUs) / kBaseScaleFactor
last_timestamp_us_ = GetBaseTimeUs()
base_time_ticks_ = (ref_timestamp.toEpochMicro() % kTimeWrapPeriod.toMicros()) / kBaseScaleFactor.toMicros()
last_timestamp_ = BaseTime()
}

fun AddReceivedPacket(seqNum: Int, timestamp_us: Long): Boolean {
fun AddReceivedPacket(seqNum: Int, timestamp: Instant): Boolean {
val sequence_number = seqNum.toRtpSequenceNumber()
var delta_full = (timestamp_us - last_timestamp_us_) % kTimeWrapPeriodUs
if (delta_full > kTimeWrapPeriodUs / 2) {
delta_full -= kTimeWrapPeriodUs
var delta_full = Duration.between(last_timestamp_, timestamp).toMicros() % kTimeWrapPeriod.toMicros()
if (delta_full > kTimeWrapPeriod.toMicros() / 2) {
delta_full -= kTimeWrapPeriod.toMicros()
delta_full -= kDeltaScaleFactor.toMicros() / 2
} else {
delta_full += kDeltaScaleFactor.toMicros() / 2
}
delta_full += if (delta_full < 0) -(kDeltaScaleFactor / 2) else kDeltaScaleFactor / 2
delta_full /= kDeltaScaleFactor
delta_full /= kDeltaScaleFactor.toMicros()

val delta = delta_full.toShort()
// If larger than 16bit signed, we can't represent it - need new fb packet.
Expand All @@ -137,13 +143,15 @@ class RtcpFbTccPacketBuilder(
}

packets_.add(ReceivedPacketReport(sequence_number.value, delta))
last_timestamp_us_ += delta * kDeltaScaleFactor
last_timestamp_ += delta.toInt() * kDeltaScaleFactor
size_bytes_ += delta_size

return true
}

fun GetBaseTimeUs(): Long = base_time_ticks_ * kBaseScaleFactor
fun BaseTime(): Instant {
return Instant.EPOCH + base_time_ticks_ * kBaseScaleFactor
}

private fun AddDeltaSize(deltaSize: DeltaSize): Boolean {
if (num_seq_no_ == kMaxReportedPackets) {
Expand Down Expand Up @@ -295,7 +303,7 @@ class RtcpFbTccPacket(
val encoded_chunks_: MutableList<Chunk>,
var last_chunk_: LastChunk,
var num_seq_no_: Int,
var last_timestamp_us_: Long,
var last_timestamp_: Instant,
val packets_: MutableList<PacketReport>
)

Expand All @@ -305,7 +313,7 @@ class RtcpFbTccPacket(
val encoded_chunks_ = mutableListOf<Chunk>()
val last_chunk_ = LastChunk()
val num_seq_no_: Int
var last_timestamp_us_: Long = 0
var last_timestamp_: Instant = Instant.EPOCH
val packets_ = mutableListOf<PacketReport>()

val base_time_ticks_ = getReferenceTimeTicks(buffer, offset)
Expand Down Expand Up @@ -343,13 +351,13 @@ class RtcpFbTccPacket(
1 -> {
val delta = buffer[index]
packets_.add(ReceivedPacketReport(seq_no.value, delta.toPositiveShort()))
last_timestamp_us_ += delta * kDeltaScaleFactor
last_timestamp_ += delta.toInt() * kDeltaScaleFactor
index += delta_size
}
2 -> {
val delta = buffer.getShortAsInt(index)
packets_.add(ReceivedPacketReport(seq_no.value, delta.toShort()))
last_timestamp_us_ += delta * kDeltaScaleFactor
last_timestamp_ += delta * kDeltaScaleFactor
index += delta_size
}
3 -> {
Expand All @@ -376,7 +384,7 @@ class RtcpFbTccPacket(
encoded_chunks_,
last_chunk_,
num_seq_no_,
last_timestamp_us_,
last_timestamp_,
packets_
)
}
Expand All @@ -401,10 +409,10 @@ class RtcpFbTccPacket(
}
private val packets_: MutableList<PacketReport>
get() = data.packets_
private var last_timestamp_us_: Long
get() = data.last_timestamp_us_
private var last_timestamp_: Instant
get() = data.last_timestamp_
set(value) {
data.last_timestamp_us_ = value
data.last_timestamp_ = value
}

// The reference time, in ticks.
Expand All @@ -416,7 +424,9 @@ class RtcpFbTccPacket(

val feedbackSeqNum: Int = getFeedbackPacketCount(buffer, offset)

fun GetBaseTimeUs(): Long = base_time_ticks_ * kBaseScaleFactor
fun BaseTime(): Instant {
return Instant.EPOCH + base_time_ticks_ * kBaseScaleFactor
}

override fun iterator(): Iterator<PacketReport> = packets_.iterator()

Expand All @@ -426,7 +436,7 @@ class RtcpFbTccPacket(
const val FMT = 15

// Convert to multiples of 0.25ms
const val kDeltaScaleFactor = 250
val kDeltaScaleFactor = 250.micros

// Maximum number of packets_ (including missing) TransportFeedback can report.
const val kMaxReportedPackets = 0xFFFF
Expand All @@ -442,11 +452,11 @@ class RtcpFbTccPacket(
const val kTransportFeedbackHeaderSizeBytes = 4 + 8 + 8

// Used to convert from microseconds to multiples of 64ms
const val kBaseScaleFactor = kDeltaScaleFactor * (1 shl 8)
val kBaseScaleFactor = kDeltaScaleFactor * (1 shl 8)

// The reference time field is 24 bits and are represented as multiples of 64ms
// When the reference time field would need to wrap around
const val kTimeWrapPeriodUs: Long = (1 shl 24).toLong() * kBaseScaleFactor
val kTimeWrapPeriod = (1 shl 24).toLong() * kBaseScaleFactor

const val BASE_SEQ_NUM_OFFSET = RtcpFbPacket.HEADER_SIZE
const val PACKET_STATUS_COUNT_OFFSET = RtcpFbPacket.HEADER_SIZE + 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.beInstanceOf
import org.jitsi.rtp.rtcp.RtcpHeaderBuilder
import org.jitsi.rtp.util.byteBufferOf
import org.jitsi.utils.instantOfEpochMicro
import org.jitsi.utils.micros
import java.time.Duration

class RtcpFbTccPacketTest : ShouldSpec() {
Expand Down Expand Up @@ -177,23 +179,23 @@ class RtcpFbTccPacketTest : ShouldSpec() {
mediaSourceSsrc = 2397376430,
feedbackPacketSeqNum = 162
)
rtcpFbTccPacketBuilder.SetBase(6227, 107784064)
rtcpFbTccPacketBuilder.AddReceivedPacket(6228, 107784064) shouldBe true
rtcpFbTccPacketBuilder.SetBase(6227, instantOfEpochMicro(107784064))
rtcpFbTccPacketBuilder.AddReceivedPacket(6228, instantOfEpochMicro(107784064)) shouldBe true
}
context("Creating and parsing an RtcpFbTccPacket") {
context("with missing packets") {
val kBaseSeqNo = 1000
val kBaseTimestampUs = 10000L
val kBaseTimestamp = instantOfEpochMicro(10000L)
val rtcpFbTccPacketBuilder = RtcpFbTccPacketBuilder(
rtcpHeader = RtcpHeaderBuilder(
senderSsrc = 839852602
),
mediaSourceSsrc = 2397376430,
feedbackPacketSeqNum = 163
)
rtcpFbTccPacketBuilder.SetBase(kBaseSeqNo, kBaseTimestampUs)
rtcpFbTccPacketBuilder.AddReceivedPacket(kBaseSeqNo + 0, kBaseTimestampUs)
rtcpFbTccPacketBuilder.AddReceivedPacket(kBaseSeqNo + 3, kBaseTimestampUs + 2000)
rtcpFbTccPacketBuilder.SetBase(kBaseSeqNo, kBaseTimestamp)
rtcpFbTccPacketBuilder.AddReceivedPacket(kBaseSeqNo + 0, kBaseTimestamp)
rtcpFbTccPacketBuilder.AddReceivedPacket(kBaseSeqNo + 3, kBaseTimestamp + 2000.micros)

val coded = rtcpFbTccPacketBuilder.build()

Expand Down

0 comments on commit d3f5db4

Please sign in to comment.