Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert TCC packets to use Instant and Duration #2219

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.jitsi.nlj.util.DataSize
import org.jitsi.nlj.util.NEVER
import org.jitsi.nlj.util.Rfc3711IndexTracker
import org.jitsi.nlj.util.formatMilli
import org.jitsi.nlj.util.instantOfEpochMicro
import org.jitsi.rtp.rtcp.RtcpPacket
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.PacketReport
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.ReceivedPacketReport
Expand Down Expand Up @@ -126,7 +125,7 @@ class TransportCcEngine(

private fun tccReceived(tccPacket: RtcpFbTccPacket) {
val now = clock.instant()
var currArrivalTimestamp = instantOfEpochMicro(tccPacket.GetBaseTimeUs())
var currArrivalTimestamp = tccPacket.BaseTime()
if (remoteReferenceTime == NEVER) {
remoteReferenceTime = currArrivalTimestamp
localReferenceTime = now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.jitsi.nlj.util.NEVER
import org.jitsi.nlj.util.ReadOnlyStreamInformationStore
import org.jitsi.nlj.util.Rfc3711IndexTracker
import org.jitsi.nlj.util.bytes
import org.jitsi.nlj.util.toEpochMicro
import org.jitsi.rtp.rtcp.RtcpPacket
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacketBuilder
Expand Down Expand Up @@ -182,20 +181,19 @@ class TccGeneratorNode(
mediaSourceSsrc = mediaSsrc,
feedbackPacketSeqNum = currTccSeqNum++
)
currentTccPacket.SetBase(windowStartSeq, firstEntry.value.toEpochMicro())
currentTccPacket.SetBase(windowStartSeq, firstEntry.value)

var nextSequenceNumber = windowStartSeq
val feedbackBlockPackets = packetArrivalTimes.tailMap(windowStartSeq)
feedbackBlockPackets.forEach { (seq, timestamp) ->
val timestampUs = timestamp.toEpochMicro()
if (!currentTccPacket.AddReceivedPacket(seq, timestampUs)) {
if (!currentTccPacket.AddReceivedPacket(seq, timestamp)) {
tccPackets.add(currentTccPacket.build())
currentTccPacket = RtcpFbTccPacketBuilder(
mediaSourceSsrc = mediaSsrc,
feedbackPacketSeqNum = currTccSeqNum++
).apply {
SetBase(seq, timestampUs)
AddReceivedPacket(seq, timestampUs)
SetBase(seq, timestamp)
AddReceivedPacket(seq, timestamp)
}
}
nextSequenceNumber = seq + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,47 +29,6 @@ fun Instant.formatMilli(): String = TimeUtils.formatTimeAsFullMillis(this.epochS

fun Duration.formatMilli(): String = TimeUtils.formatTimeAsFullMillis(this.seconds, this.nano)

/**
* Converts this instant to the number of microseconds from the epoch
* of 1970-01-01T00:00:00Z.
*
* If this instant represents a point on the time-line too far in the future
* or past to fit in a [Long] microseconds, then an exception is thrown.
*
* If this instant has greater than microsecond precision, then the conversion
* will drop any excess precision information as though the amount in nanoseconds
* was subject to integer division by one thousand.
*
* @return the number of microseconds since the epoch of 1970-01-01T00:00:00Z
* @throws ArithmeticException if numeric overflow occurs
*/
fun Instant.toEpochMicro(): Long {
return if (this.epochSecond < 0 && this.nano > 0) {
val micros = Math.multiplyExact(this.epochSecond + 1, 1000_000L)
val adjustment: Long = (this.nano / 1000 - 1000_000).toLong()
Math.addExact(micros, adjustment)
} else {
val micros = Math.multiplyExact(this.epochSecond, 1000_000L)
Math.addExact(micros, (this.nano / 1000).toLong())
}
}

/**
* Obtains an instance of [Instant] using microseconds from the
* epoch of 1970-01-01T00:00:00Z.
* <p>
* The seconds and nanoseconds are extracted from the specified milliseconds.
*
* @param epochMicro the number of microseconds from 1970-01-01T00:00:00Z
* @return an instant, not null
* @throws DateTimeException if the instant exceeds the maximum or minimum instant
*/
fun instantOfEpochMicro(epochMicro: Long): Instant {
val secs = Math.floorDiv(epochMicro, 1000_000L)
val micros = Math.floorMod(epochMicro, 1000_000L)
return Instant.ofEpochSecond(secs, micros * 1000L)
}

fun <T> Iterable<T>.sumOf(selector: (T) -> Duration): Duration {
var sum: Duration = Duration.ZERO
for (element in this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.jitsi.nlj.resources.logging.StdoutLogger
import org.jitsi.nlj.rtp.bandwidthestimation.BandwidthEstimator
import org.jitsi.nlj.util.bytes
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacketBuilder
import org.jitsi.utils.instantOfEpochMicro
import org.jitsi.utils.time.FakeClock
import java.util.logging.Level

Expand Down Expand Up @@ -58,11 +59,11 @@ class TransportCcEngineTest : FunSpec() {
transportCcEngine.mediaPacketSent(4, 1300.bytes)

val tccPacket = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 0)) {
SetBase(1, 100)
AddReceivedPacket(1, 100)
AddReceivedPacket(2, 110)
AddReceivedPacket(3, 120)
AddReceivedPacket(4, 130)
SetBase(1, instantOfEpochMicro(100))
AddReceivedPacket(1, instantOfEpochMicro(100))
AddReceivedPacket(2, instantOfEpochMicro(110))
AddReceivedPacket(3, instantOfEpochMicro(120))
AddReceivedPacket(4, instantOfEpochMicro(130))
build()
}

Expand All @@ -81,15 +82,15 @@ class TransportCcEngineTest : FunSpec() {
transportCcEngine.mediaPacketSent(4, 1300.bytes)

val tccPacket = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 1)) {
SetBase(4, 130)
AddReceivedPacket(4, 130)
SetBase(4, instantOfEpochMicro(130))
AddReceivedPacket(4, instantOfEpochMicro(130))
build()
}
transportCcEngine.rtcpPacketReceived(tccPacket, clock.instant())

val tccPacket2 = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 2)) {
SetBase(4, 130)
AddReceivedPacket(4, 130)
SetBase(4, instantOfEpochMicro(130))
AddReceivedPacket(4, instantOfEpochMicro(130))
build()
}
transportCcEngine.rtcpPacketReceived(tccPacket2, clock.instant())
Expand All @@ -108,8 +109,8 @@ class TransportCcEngineTest : FunSpec() {
transportCcEngine.mediaPacketSent(5, 1300.bytes)

val tccPacket = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 1)) {
SetBase(4, 130)
AddReceivedPacket(5, 130)
SetBase(4, instantOfEpochMicro(130))
AddReceivedPacket(5, instantOfEpochMicro(130))
build()
}
transportCcEngine.rtcpPacketReceived(tccPacket, clock.instant())
Expand All @@ -118,8 +119,8 @@ class TransportCcEngineTest : FunSpec() {
lossListener.numLost shouldBe 1

val tccPacket2 = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 2)) {
SetBase(4, 130)
AddReceivedPacket(4, 130)
SetBase(4, instantOfEpochMicro(130))
AddReceivedPacket(4, instantOfEpochMicro(130))
build()
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<kotlin.version>2.0.0</kotlin.version>
<kotest.version>5.9.1</kotest.version>
<junit.version>5.10.2</junit.version>
<jitsi.utils.version>1.0-132-g83984af</jitsi.utils.version>
<jitsi.utils.version>1.0-133-g6af1020</jitsi.utils.version>
<jicoco.version>1.1-141-g30ec741</jicoco.version>
<mockk.version>1.13.11</mockk.version>
<ktlint-maven-plugin.version>3.2.0</ktlint-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companio
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kDeltaScaleFactor
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kMaxReportedPackets
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kMaxSizeBytes
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kTimeWrapPeriodUs
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kTimeWrapPeriod
import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kTransportFeedbackHeaderSizeBytes
import org.jitsi.rtp.rtp.RtpSequenceNumber
import org.jitsi.rtp.rtp.toRtpSequenceNumber
Expand All @@ -39,8 +39,12 @@ import org.jitsi.rtp.util.RtpUtils
import org.jitsi.rtp.util.get3BytesAsInt
import org.jitsi.rtp.util.getByteAsInt
import org.jitsi.rtp.util.getShortAsInt
import org.jitsi.utils.micros
import org.jitsi.utils.times
import org.jitsi.utils.toEpochMicro
import org.jitsi.utils.toMicros
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.time.Instant

sealed class PacketReport(val seqNum: Int)

Expand All @@ -52,7 +56,7 @@ typealias DeltaSize = Int

class ReceivedPacketReport(seqNum: Int, val deltaTicks: Short) : PacketReport(seqNum) {
val deltaDuration: Duration
get() = Duration.of(deltaTicks * 250L, ChronoUnit.MICROS)
get() = deltaTicks.toInt() * kDeltaScaleFactor
}

/**
Expand Down Expand Up @@ -95,23 +99,25 @@ class RtcpFbTccPacketBuilder(

// The size of the entire packet, in bytes
private var size_bytes_ = kTransportFeedbackHeaderSizeBytes
private var last_timestamp_us_: Long = 0
private var last_timestamp_: Instant = Instant.EPOCH
private val packets_ = mutableListOf<PacketReport>()

fun SetBase(base_sequence: Int, ref_timestamp_us: Long) {
fun SetBase(base_sequence: Int, ref_timestamp: Instant) {
base_seq_no_ = base_sequence.toRtpSequenceNumber()
base_time_ticks_ = (ref_timestamp_us % kTimeWrapPeriodUs) / kBaseScaleFactor
last_timestamp_us_ = GetBaseTimeUs()
base_time_ticks_ = (ref_timestamp.toEpochMicro() % kTimeWrapPeriod.toMicros()) / kBaseScaleFactor.toMicros()
last_timestamp_ = BaseTime()
}

fun AddReceivedPacket(seqNum: Int, timestamp_us: Long): Boolean {
fun AddReceivedPacket(seqNum: Int, timestamp: Instant): Boolean {
val sequence_number = seqNum.toRtpSequenceNumber()
var delta_full = (timestamp_us - last_timestamp_us_) % kTimeWrapPeriodUs
if (delta_full > kTimeWrapPeriodUs / 2) {
delta_full -= kTimeWrapPeriodUs
var delta_full = Duration.between(last_timestamp_, timestamp).toMicros() % kTimeWrapPeriod.toMicros()
if (delta_full > kTimeWrapPeriod.toMicros() / 2) {
delta_full -= kTimeWrapPeriod.toMicros()
delta_full -= kDeltaScaleFactor.toMicros() / 2
} else {
delta_full += kDeltaScaleFactor.toMicros() / 2
}
delta_full += if (delta_full < 0) -(kDeltaScaleFactor / 2) else kDeltaScaleFactor / 2
delta_full /= kDeltaScaleFactor
delta_full /= kDeltaScaleFactor.toMicros()

val delta = delta_full.toShort()
// If larger than 16bit signed, we can't represent it - need new fb packet.
Expand All @@ -137,13 +143,15 @@ class RtcpFbTccPacketBuilder(
}

packets_.add(ReceivedPacketReport(sequence_number.value, delta))
last_timestamp_us_ += delta * kDeltaScaleFactor
last_timestamp_ += delta.toInt() * kDeltaScaleFactor
size_bytes_ += delta_size

return true
}

fun GetBaseTimeUs(): Long = base_time_ticks_ * kBaseScaleFactor
fun BaseTime(): Instant {
return Instant.EPOCH + base_time_ticks_ * kBaseScaleFactor
}

private fun AddDeltaSize(deltaSize: DeltaSize): Boolean {
if (num_seq_no_ == kMaxReportedPackets) {
Expand Down Expand Up @@ -295,7 +303,7 @@ class RtcpFbTccPacket(
val encoded_chunks_: MutableList<Chunk>,
var last_chunk_: LastChunk,
var num_seq_no_: Int,
var last_timestamp_us_: Long,
var last_timestamp_: Instant,
val packets_: MutableList<PacketReport>
)

Expand All @@ -305,7 +313,7 @@ class RtcpFbTccPacket(
val encoded_chunks_ = mutableListOf<Chunk>()
val last_chunk_ = LastChunk()
val num_seq_no_: Int
var last_timestamp_us_: Long = 0
var last_timestamp_: Instant = Instant.EPOCH
val packets_ = mutableListOf<PacketReport>()

val base_time_ticks_ = getReferenceTimeTicks(buffer, offset)
Expand Down Expand Up @@ -343,13 +351,13 @@ class RtcpFbTccPacket(
1 -> {
val delta = buffer[index]
packets_.add(ReceivedPacketReport(seq_no.value, delta.toPositiveShort()))
last_timestamp_us_ += delta * kDeltaScaleFactor
last_timestamp_ += delta.toInt() * kDeltaScaleFactor
index += delta_size
}
2 -> {
val delta = buffer.getShortAsInt(index)
packets_.add(ReceivedPacketReport(seq_no.value, delta.toShort()))
last_timestamp_us_ += delta * kDeltaScaleFactor
last_timestamp_ += delta * kDeltaScaleFactor
index += delta_size
}
3 -> {
Expand All @@ -376,7 +384,7 @@ class RtcpFbTccPacket(
encoded_chunks_,
last_chunk_,
num_seq_no_,
last_timestamp_us_,
last_timestamp_,
packets_
)
}
Expand All @@ -401,10 +409,10 @@ class RtcpFbTccPacket(
}
private val packets_: MutableList<PacketReport>
get() = data.packets_
private var last_timestamp_us_: Long
get() = data.last_timestamp_us_
private var last_timestamp_: Instant
get() = data.last_timestamp_
set(value) {
data.last_timestamp_us_ = value
data.last_timestamp_ = value
}

// The reference time, in ticks.
Expand All @@ -416,7 +424,9 @@ class RtcpFbTccPacket(

val feedbackSeqNum: Int = getFeedbackPacketCount(buffer, offset)

fun GetBaseTimeUs(): Long = base_time_ticks_ * kBaseScaleFactor
fun BaseTime(): Instant {
return Instant.EPOCH + base_time_ticks_ * kBaseScaleFactor
}

override fun iterator(): Iterator<PacketReport> = packets_.iterator()

Expand All @@ -426,7 +436,7 @@ class RtcpFbTccPacket(
const val FMT = 15

// Convert to multiples of 0.25ms
const val kDeltaScaleFactor = 250
val kDeltaScaleFactor = 250.micros

// Maximum number of packets_ (including missing) TransportFeedback can report.
const val kMaxReportedPackets = 0xFFFF
Expand All @@ -442,11 +452,11 @@ class RtcpFbTccPacket(
const val kTransportFeedbackHeaderSizeBytes = 4 + 8 + 8

// Used to convert from microseconds to multiples of 64ms
const val kBaseScaleFactor = kDeltaScaleFactor * (1 shl 8)
val kBaseScaleFactor = kDeltaScaleFactor * (1 shl 8)

// The reference time field is 24 bits and are represented as multiples of 64ms
// When the reference time field would need to wrap around
const val kTimeWrapPeriodUs: Long = (1 shl 24).toLong() * kBaseScaleFactor
val kTimeWrapPeriod = (1 shl 24).toLong() * kBaseScaleFactor

const val BASE_SEQ_NUM_OFFSET = RtcpFbPacket.HEADER_SIZE
const val PACKET_STATUS_COUNT_OFFSET = RtcpFbPacket.HEADER_SIZE + 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.beInstanceOf
import org.jitsi.rtp.rtcp.RtcpHeaderBuilder
import org.jitsi.rtp.util.byteBufferOf
import org.jitsi.utils.instantOfEpochMicro
import org.jitsi.utils.micros
import java.time.Duration

class RtcpFbTccPacketTest : ShouldSpec() {
Expand Down Expand Up @@ -177,23 +179,23 @@ class RtcpFbTccPacketTest : ShouldSpec() {
mediaSourceSsrc = 2397376430,
feedbackPacketSeqNum = 162
)
rtcpFbTccPacketBuilder.SetBase(6227, 107784064)
rtcpFbTccPacketBuilder.AddReceivedPacket(6228, 107784064) shouldBe true
rtcpFbTccPacketBuilder.SetBase(6227, instantOfEpochMicro(107784064))
rtcpFbTccPacketBuilder.AddReceivedPacket(6228, instantOfEpochMicro(107784064)) shouldBe true
}
context("Creating and parsing an RtcpFbTccPacket") {
context("with missing packets") {
val kBaseSeqNo = 1000
val kBaseTimestampUs = 10000L
val kBaseTimestamp = instantOfEpochMicro(10000L)
val rtcpFbTccPacketBuilder = RtcpFbTccPacketBuilder(
rtcpHeader = RtcpHeaderBuilder(
senderSsrc = 839852602
),
mediaSourceSsrc = 2397376430,
feedbackPacketSeqNum = 163
)
rtcpFbTccPacketBuilder.SetBase(kBaseSeqNo, kBaseTimestampUs)
rtcpFbTccPacketBuilder.AddReceivedPacket(kBaseSeqNo + 0, kBaseTimestampUs)
rtcpFbTccPacketBuilder.AddReceivedPacket(kBaseSeqNo + 3, kBaseTimestampUs + 2000)
rtcpFbTccPacketBuilder.SetBase(kBaseSeqNo, kBaseTimestamp)
rtcpFbTccPacketBuilder.AddReceivedPacket(kBaseSeqNo + 0, kBaseTimestamp)
rtcpFbTccPacketBuilder.AddReceivedPacket(kBaseSeqNo + 3, kBaseTimestamp + 2000.micros)

val coded = rtcpFbTccPacketBuilder.build()

Expand Down
Loading