Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log BWE stats in a compatible with Medooze stats viewer #155

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions src/main/kotlin/org/jitsi/nlj/PacketInfo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ open class PacketInfo @JvmOverloads constructor(
*/
var shouldDiscard: Boolean = false

/**
* Whether this packet was generated to be a probing packet.
*/
var isProbing: Boolean = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am a bit worried about PacketInfo 1) becoming a dumping ground for fields and, more fundamentally, holding data that doesn't really apply to all packets (true, marking an audio packet or sctp packet as isProbing = false is technically accurate, but it still feels a bit wrong). i don't think this change along ruins things terribly, but i worry about the precedent. i'm probably being a bit too paranoid and if we don't think it's too bad it's something we could wait to address down the line should it become a problem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would making a VideoPacketInfo subclass be helpful?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, but wondered what sort of guarantees we could make as to when one would actually be used. For example, would we convert it on the ingress as well when we discovered a packet was video? Or would it only be in certain places? Also, if we were going to do this we'd have to test and cast it, but at that point would it be that much different than just testing and casting the underlying packet? If not, then maybe we could just do different packet types to denote probing?


/**
* The ID of the endpoint associated with this packet (i.e. the source endpoint).
*/
Expand Down
53 changes: 27 additions & 26 deletions src/main/kotlin/org/jitsi/nlj/rtp/TransportCcEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ class TransportCcEngine(
private var localReferenceTime: Instant = NEVER

/**
* Holds a key value pair of the packet sequence number and an object made
* up of the packet send time and the packet size.
* Holds a key value pair of the packet sequence number and the
* packet stats.
*/
private val sentPacketDetails = LRUCache<Int, PacketDetail>(MAX_OUTGOING_PACKETS_HISTORY)
private val sentPacketStats = LRUCache<Int, BandwidthEstimator.PacketStats>(MAX_OUTGOING_PACKETS_HISTORY)

private val missingPacketDetailSeqNums = mutableListOf<Int>()
private val missingPacketStatsSeqNums = mutableListOf<Int>()

/**
* Called when an RTP sender has a new round-trip time estimate.
Expand All @@ -102,63 +102,64 @@ class TransportCcEngine(

// We have to remember the oldest known sequence number here, as we
// remove from sentPacketDetails inside this loop
val oldestKnownSeqNum = synchronized(sentPacketsSyncRoot) { sentPacketDetails.oldestEntry() }
val oldestKnownSeqNum = synchronized(sentPacketsSyncRoot) { sentPacketStats.oldestEntry() }
for (packetReport in tccPacket) {
val tccSeqNum = packetReport.seqNum
val packetDetail = synchronized(sentPacketsSyncRoot) {
sentPacketDetails.remove(tccSeqNum)
val packetStats = synchronized(sentPacketsSyncRoot) {
sentPacketStats.remove(tccSeqNum)
}

if (packetDetail == null) {
if (packetStats == null) {
if (packetReport is ReceivedPacketReport) {
missingPacketDetailSeqNums.add(tccSeqNum)
missingPacketStatsSeqNums.add(tccSeqNum)
}
continue
}

when (packetReport) {
is UnreceivedPacketReport ->
bandwidthEstimator.processPacketLoss(now, packetDetail.packetSendTime, tccSeqNum)
bandwidthEstimator.processPacketLoss(now, packetStats)

is ReceivedPacketReport -> {
currArrivalTimestamp += packetReport.deltaDuration

val arrivalTimeInLocalClock = currArrivalTimestamp - Duration.between(localReferenceTime, remoteReferenceTime)

bandwidthEstimator.processPacketArrival(
now, packetDetail.packetSendTime, arrivalTimeInLocalClock, tccSeqNum, packetDetail.packetLength)
now, packetStats, arrivalTimeInLocalClock)
}
}
}
if (missingPacketDetailSeqNums.isNotEmpty()) {
bandwidthEstimator.feedbackComplete(now)

if (missingPacketStatsSeqNums.isNotEmpty()) {
logger.warn("TCC packet contained received sequence numbers: " +
"${tccPacket.iterator().asSequence()
.filterIsInstance<ReceivedPacketReport>()
.map(PacketReport::seqNum)
.joinToString()}. " +
"Couldn't find packet detail for the seq nums: ${missingPacketDetailSeqNums.joinToString()}. " +
"Couldn't find packet stats for the seq nums: ${missingPacketStatsSeqNums.joinToString()}. " +
(oldestKnownSeqNum?.let { "Oldest known seqNum was $it." } ?: run { "Sent packet details map was empty." }))
missingPacketDetailSeqNums.clear()
missingPacketStatsSeqNums.clear()
}
}

fun mediaPacketSent(tccSeqNum: Int, length: DataSize) {
fun mediaPacketSent(
tccSeqNum: Int,
length: DataSize,
isRtx: Boolean = false,
isProbing: Boolean = false
) {
synchronized(sentPacketsSyncRoot) {
val now = clock.instant()
sentPacketDetails.put(
tccSeqNum and 0xFFFF,
PacketDetail(length, now))
val modSeq = tccSeqNum and 0xFFFF
val stats = BandwidthEstimator.PacketStats(modSeq, length, now)
stats.isRtx = isRtx
stats.isProbing = isProbing
sentPacketStats.put(modSeq, stats)
}
}

/**
* [PacketDetail] is an object that holds the
* length(size) of the packet in [packetLength]
* and the time stamps of the outgoing packet
* in [packetSendTime]
*/
private data class PacketDetail internal constructor(internal val packetLength: DataSize, internal val packetSendTime: Instant)

companion object {
/**
* The maximum number of received packets and their timestamps to save.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,31 @@ abstract class BandwidthEstimator(
/** The maximum bandwidth the estimator will return. */
abstract var maxBw: Bandwidth

/**
* Inform the bandwidth estimator about a packet that has been sent.
*/
fun processPacketTransmission(
now: Instant,
stats: PacketStats
) {
if (timeSeriesLogger.isTraceEnabled) {
val point = diagnosticContext.makeTimeSeriesPoint("bwe_packet_transmission", now)
stats.addToTimeSeriesPoint(point)
timeSeriesLogger.trace(point)
}
doProcessPacketTransmission(now, stats)
}

/**
* A subclass's implementation of [processPacketTransmission].
*
* See that function for parameter details.
*/
protected abstract fun doProcessPacketTransmission(
now: Instant,
stats: PacketStats
)

/**
* Inform the bandwidth estimator about a packet that has arrived at its
* destination.
Expand All @@ -69,43 +94,38 @@ abstract class BandwidthEstimator(
* It is possible (e.g., if feedback was lost) that neither
* [processPacketArrival] nor [processPacketLoss] is called for a given [seq].
*
* The clocks reported by [now], [sendTime], and [recvTime] do not
* The clocks reported by [now], [stats.sendTime], and [recvTime] do not
* necessarily have any relationship to each other, but must be consistent
* within themselves across all calls to functions of this [BandwidthEstimator].
*
* All arrival and loss reports based on a single feedback message should have the
* same [now] value. [feedbackComplete] should be called once all feedback reports
* based on a single feedback message have been processed.
*
* @param[now] The current time, when this function is called.
* @param[sendTime] The time the packet was sent, if known, or null.
* @param[stats] [PacketStats] about this packet that was sent.
* @param[recvTime] The time the packet was received, if known, or null.
* @param[seq] A 16-bit sequence number of packets processed by this
* [BandwidthEstimator].
* @param[size] The size of the packet.
* @param[ecn] The ECN markings with which the packet was received.
* @param[recvEcn] The ECN markings with which the packet was received.
*/
fun processPacketArrival(
now: Instant,
sendTime: Instant?,
stats: PacketStats,
recvTime: Instant?,
seq: Int,
size: DataSize,
ecn: Byte = 0
recvEcn: Byte = 0
) {
if (timeSeriesLogger.isTraceEnabled) {
val point = diagnosticContext.makeTimeSeriesPoint("bwe_packet_arrival", now)
if (sendTime != null) {
point.addField("sendTime", sendTime.formatMilli())
}
stats.addToTimeSeriesPoint(point)
if (recvTime != null) {
point.addField("recvTime", recvTime.formatMilli())
}
point.addField("seq", seq)
point.addField("size", size.bytes)
if (ecn != 0.toByte()) {
point.addField("ecn", ecn)
if (recvEcn != 0.toByte()) {
point.addField("recvEcn", recvEcn)
}
timeSeriesLogger.trace(point)
}

doProcessPacketArrival(now, sendTime, recvTime, seq, size, ecn)
doProcessPacketArrival(now, stats, recvTime, recvEcn)
}

/**
Expand All @@ -115,40 +135,45 @@ abstract class BandwidthEstimator(
*/
protected abstract fun doProcessPacketArrival(
now: Instant,
sendTime: Instant?,
stats: PacketStats,
recvTime: Instant?,
seq: Int,
size: DataSize,
ecn: Byte = 0
recvEcn: Byte = 0
)

/**
* Inform the bandwidth estimator that a packet was lost.
*
* All arrival and loss reports based on a single feedback message should have the
* same [now] value. [feedbackComplete] should be called once all feedback reports
* based on a single feedback message have been processed.
*
* @param[now] The current time, when this function is called.
* @param[sendTime] The time the packet was sent, if known, or null.
* @param[seq] A 16-bit sequence number of packets processed by this
* [BandwidthEstimator].
* @param[stats] [PacketStats] about this packet that was sent.
*/
fun processPacketLoss(now: Instant, sendTime: Instant?, seq: Int) {
fun processPacketLoss(now: Instant, stats: PacketStats) {
if (timeSeriesLogger.isTraceEnabled) {
val point = diagnosticContext.makeTimeSeriesPoint("bwe_packet_loss", now)
if (sendTime != null) {
point.addField("sendTime", sendTime.formatMilli())
}
point.addField("seq", seq)
stats.addToTimeSeriesPoint(point)
timeSeriesLogger.trace(point)
}

doProcessPacketLoss(now, sendTime, seq)
doProcessPacketLoss(now, stats)
}

/**
* A subclass's implementation of [processPacketLoss].
*
* See that function for parameter details.
*/
protected abstract fun doProcessPacketLoss(now: Instant, sendTime: Instant?, seq: Int)
protected abstract fun doProcessPacketLoss(now: Instant, stats: PacketStats)

/**
* Inform the bandwidht estimator that a block of feedback is complete.
*
* @param[now] The current time, when this function is called. This should match
* the value passed to [processPacketArrival] and [processPacketLoss].
*/
abstract fun feedbackComplete(now: Instant)

/**
* Inform the bandwidth estimator about a new round-trip time value
Expand Down Expand Up @@ -239,6 +264,45 @@ abstract class BandwidthEstimator(
listeners.remove(listener)
}

/**
* Information about a sent packet, as reported to a [BandwidthEstimator].
*/
data class PacketStats(
/**
* A 16-bit sequence number of packets processed by this
* [BandwidthEstimator].
*/
val seq: Int,
/**
* The size of the packet.
*/
val size: DataSize,
/**
* The time the packet was sent, if known, or null.
*/
val sendTime: Instant?
) {
/**
* Whether the sent packet was a retransmission
*/
var isRtx: Boolean = false

/**
* Whether the sent packet was a bandwidth probe
*/
var isProbing: Boolean = false

fun addToTimeSeriesPoint(point: DiagnosticContext.TimeSeriesPoint) {
if (sendTime != null) {
point.addField("sendTime", sendTime.formatMilli())
}
point.addField("seq", seq)
point.addField("size", size.bytes)
point.addField("rtx", isRtx)
point.addField("probing", isProbing)
}
}

/**
* Holds a snapshot of stats specific to the bandwidth estimator.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import java.time.Duration
import java.time.Instant
import kotlin.properties.Delegates
import org.jitsi.nlj.util.Bandwidth
import org.jitsi.nlj.util.DataSize
import org.jitsi.nlj.util.bps
import org.jitsi.nlj.util.createChildLogger
import org.jitsi.nlj.util.kbps
Expand Down Expand Up @@ -53,21 +52,25 @@ class GoogleCcEstimator(diagnosticContext: DiagnosticContext, parentLogger: Logg
it.setMinMaxBitrate(minBw.bps.toInt(), maxBw.bps.toInt())
}

override fun doProcessPacketArrival(now: Instant, sendTime: Instant?, recvTime: Instant?, seq: Int, size: DataSize, ecn: Byte) {
if (sendTime != null && recvTime != null) {
override fun doProcessPacketTransmission(now: Instant, stats: PacketStats) {
/* Do nothing, Google CC doesn't care about packet transmission. */
}

override fun doProcessPacketArrival(now: Instant, stats: PacketStats, recvTime: Instant?, recvEcn: Byte) {
if (stats.sendTime != null && recvTime != null) {
bitrateEstimatorAbsSendTime.incomingPacketInfo(now.toEpochMilli(),
recvTime.toEpochMilli(), sendTime.toEpochMilli(), size.bytes.toInt())
recvTime.toEpochMilli(), stats.sendTime.toEpochMilli(), stats.size.bytes.toInt())
}
sendSideBandwidthEstimation.updateReceiverEstimate(bitrateEstimatorAbsSendTime.latestEstimate)
sendSideBandwidthEstimation.reportPacketArrived(now.toEpochMilli())

/* TODO: rate-limit how often we call updateEstimate? */
sendSideBandwidthEstimation.updateEstimate(now.toEpochMilli())
reportBandwidthEstimate(now, sendSideBandwidthEstimation.latestEstimate.bps)
}

override fun doProcessPacketLoss(now: Instant, sendTime: Instant?, seq: Int) {
override fun doProcessPacketLoss(now: Instant, stats: PacketStats) {
sendSideBandwidthEstimation.reportPacketLost(now.toEpochMilli())
}

override fun feedbackComplete(now: Instant) {
/* TODO: rate-limit how often we call updateEstimate? */
sendSideBandwidthEstimation.updateEstimate(now.toEpochMilli())
reportBandwidthEstimate(now, sendSideBandwidthEstimation.latestEstimate.bps)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ class ProbingDataSender(
// encapsulating packets as RTX (with the proper ssrc and payload type) so we
// just need to find the packets to retransmit and forward them to the next node
// TODO(brian): do we need to clone it here?
packetsToResend.add(PacketInfo(packet.clone()))
val packetInfo = PacketInfo(packet.clone())
packetInfo.isProbing = true
packetsToResend.add(packetInfo)
}
}
// TODO(brian): we're in a thread context mess here. we'll be sending these out from the bandwidthprobing
Expand Down Expand Up @@ -173,7 +175,9 @@ class ProbingDataSender(
paddingPacket.ssrc = senderSsrc
paddingPacket.timestamp = currDummyTimestamp
paddingPacket.sequenceNumber = currDummySeqNum
garbageDataSender.processPacket(PacketInfo(paddingPacket))
val packetInfo = PacketInfo(paddingPacket)
packetInfo.isProbing = true
garbageDataSender.processPacket(packetInfo)

currDummySeqNum++
bytesSent += packetLength
Expand Down
Loading