diff --git a/src/main/kotlin/org/jitsi/nlj/PacketInfo.kt b/src/main/kotlin/org/jitsi/nlj/PacketInfo.kt index fe514bf96..931ad6e5a 100644 --- a/src/main/kotlin/org/jitsi/nlj/PacketInfo.kt +++ b/src/main/kotlin/org/jitsi/nlj/PacketInfo.kt @@ -90,6 +90,11 @@ open class PacketInfo @JvmOverloads constructor( */ var shouldDiscard: Boolean = false + /** + * Whether this packet was generated to be a probing packet. + */ + var isProbing: Boolean = false + /** * The ID of the endpoint associated with this packet (i.e. the source endpoint). */ diff --git a/src/main/kotlin/org/jitsi/nlj/rtp/TransportCcEngine.kt b/src/main/kotlin/org/jitsi/nlj/rtp/TransportCcEngine.kt index 466830333..556684821 100644 --- a/src/main/kotlin/org/jitsi/nlj/rtp/TransportCcEngine.kt +++ b/src/main/kotlin/org/jitsi/nlj/rtp/TransportCcEngine.kt @@ -71,12 +71,12 @@ class TransportCcEngine( private var localReferenceTime: Instant = NEVER /** - * Holds a key value pair of the packet sequence number and an object made - * up of the packet send time and the packet size. + * Holds a key value pair of the packet sequence number and the + * packet stats. */ - private val sentPacketDetails = LRUCache(MAX_OUTGOING_PACKETS_HISTORY) + private val sentPacketStats = LRUCache(MAX_OUTGOING_PACKETS_HISTORY) - private val missingPacketDetailSeqNums = mutableListOf() + private val missingPacketStatsSeqNums = mutableListOf() /** * Called when an RTP sender has a new round-trip time estimate. @@ -102,23 +102,23 @@ class TransportCcEngine( // We have to remember the oldest known sequence number here, as we // remove from sentPacketDetails inside this loop - val oldestKnownSeqNum = synchronized(sentPacketsSyncRoot) { sentPacketDetails.oldestEntry() } + val oldestKnownSeqNum = synchronized(sentPacketsSyncRoot) { sentPacketStats.oldestEntry() } for (packetReport in tccPacket) { val tccSeqNum = packetReport.seqNum - val packetDetail = synchronized(sentPacketsSyncRoot) { - sentPacketDetails.remove(tccSeqNum) + val packetStats = synchronized(sentPacketsSyncRoot) { + sentPacketStats.remove(tccSeqNum) } - if (packetDetail == null) { + if (packetStats == null) { if (packetReport is ReceivedPacketReport) { - missingPacketDetailSeqNums.add(tccSeqNum) + missingPacketStatsSeqNums.add(tccSeqNum) } continue } when (packetReport) { is UnreceivedPacketReport -> - bandwidthEstimator.processPacketLoss(now, packetDetail.packetSendTime, tccSeqNum) + bandwidthEstimator.processPacketLoss(now, packetStats) is ReceivedPacketReport -> { currArrivalTimestamp += packetReport.deltaDuration @@ -126,39 +126,40 @@ class TransportCcEngine( val arrivalTimeInLocalClock = currArrivalTimestamp - Duration.between(localReferenceTime, remoteReferenceTime) bandwidthEstimator.processPacketArrival( - now, packetDetail.packetSendTime, arrivalTimeInLocalClock, tccSeqNum, packetDetail.packetLength) + now, packetStats, arrivalTimeInLocalClock) } } } - if (missingPacketDetailSeqNums.isNotEmpty()) { + bandwidthEstimator.feedbackComplete(now) + + if (missingPacketStatsSeqNums.isNotEmpty()) { logger.warn("TCC packet contained received sequence numbers: " + "${tccPacket.iterator().asSequence() .filterIsInstance() .map(PacketReport::seqNum) .joinToString()}. " + - "Couldn't find packet detail for the seq nums: ${missingPacketDetailSeqNums.joinToString()}. " + + "Couldn't find packet stats for the seq nums: ${missingPacketStatsSeqNums.joinToString()}. " + (oldestKnownSeqNum?.let { "Oldest known seqNum was $it." } ?: run { "Sent packet details map was empty." })) - missingPacketDetailSeqNums.clear() + missingPacketStatsSeqNums.clear() } } - fun mediaPacketSent(tccSeqNum: Int, length: DataSize) { + fun mediaPacketSent( + tccSeqNum: Int, + length: DataSize, + isRtx: Boolean = false, + isProbing: Boolean = false + ) { synchronized(sentPacketsSyncRoot) { val now = clock.instant() - sentPacketDetails.put( - tccSeqNum and 0xFFFF, - PacketDetail(length, now)) + val modSeq = tccSeqNum and 0xFFFF + val stats = BandwidthEstimator.PacketStats(modSeq, length, now) + stats.isRtx = isRtx + stats.isProbing = isProbing + sentPacketStats.put(modSeq, stats) } } - /** - * [PacketDetail] is an object that holds the - * length(size) of the packet in [packetLength] - * and the time stamps of the outgoing packet - * in [packetSendTime] - */ - private data class PacketDetail internal constructor(internal val packetLength: DataSize, internal val packetSendTime: Instant) - companion object { /** * The maximum number of received packets and their timestamps to save. diff --git a/src/main/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/BandwidthEstimator.kt b/src/main/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/BandwidthEstimator.kt index 817e6ddd2..b91899404 100644 --- a/src/main/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/BandwidthEstimator.kt +++ b/src/main/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/BandwidthEstimator.kt @@ -58,6 +58,31 @@ abstract class BandwidthEstimator( /** The maximum bandwidth the estimator will return. */ abstract var maxBw: Bandwidth + /** + * Inform the bandwidth estimator about a packet that has been sent. + */ + fun processPacketTransmission( + now: Instant, + stats: PacketStats + ) { + if (timeSeriesLogger.isTraceEnabled) { + val point = diagnosticContext.makeTimeSeriesPoint("bwe_packet_transmission", now) + stats.addToTimeSeriesPoint(point) + timeSeriesLogger.trace(point) + } + doProcessPacketTransmission(now, stats) + } + + /** + * A subclass's implementation of [processPacketTransmission]. + * + * See that function for parameter details. + */ + protected abstract fun doProcessPacketTransmission( + now: Instant, + stats: PacketStats + ) + /** * Inform the bandwidth estimator about a packet that has arrived at its * destination. @@ -69,43 +94,38 @@ abstract class BandwidthEstimator( * It is possible (e.g., if feedback was lost) that neither * [processPacketArrival] nor [processPacketLoss] is called for a given [seq]. * - * The clocks reported by [now], [sendTime], and [recvTime] do not + * The clocks reported by [now], [stats.sendTime], and [recvTime] do not * necessarily have any relationship to each other, but must be consistent * within themselves across all calls to functions of this [BandwidthEstimator]. * + * All arrival and loss reports based on a single feedback message should have the + * same [now] value. [feedbackComplete] should be called once all feedback reports + * based on a single feedback message have been processed. + * * @param[now] The current time, when this function is called. - * @param[sendTime] The time the packet was sent, if known, or null. + * @param[stats] [PacketStats] about this packet that was sent. * @param[recvTime] The time the packet was received, if known, or null. - * @param[seq] A 16-bit sequence number of packets processed by this - * [BandwidthEstimator]. - * @param[size] The size of the packet. - * @param[ecn] The ECN markings with which the packet was received. + * @param[recvEcn] The ECN markings with which the packet was received. */ fun processPacketArrival( now: Instant, - sendTime: Instant?, + stats: PacketStats, recvTime: Instant?, - seq: Int, - size: DataSize, - ecn: Byte = 0 + recvEcn: Byte = 0 ) { if (timeSeriesLogger.isTraceEnabled) { val point = diagnosticContext.makeTimeSeriesPoint("bwe_packet_arrival", now) - if (sendTime != null) { - point.addField("sendTime", sendTime.formatMilli()) - } + stats.addToTimeSeriesPoint(point) if (recvTime != null) { point.addField("recvTime", recvTime.formatMilli()) } - point.addField("seq", seq) - point.addField("size", size.bytes) - if (ecn != 0.toByte()) { - point.addField("ecn", ecn) + if (recvEcn != 0.toByte()) { + point.addField("recvEcn", recvEcn) } timeSeriesLogger.trace(point) } - doProcessPacketArrival(now, sendTime, recvTime, seq, size, ecn) + doProcessPacketArrival(now, stats, recvTime, recvEcn) } /** @@ -115,32 +135,29 @@ abstract class BandwidthEstimator( */ protected abstract fun doProcessPacketArrival( now: Instant, - sendTime: Instant?, + stats: PacketStats, recvTime: Instant?, - seq: Int, - size: DataSize, - ecn: Byte = 0 + recvEcn: Byte = 0 ) /** * Inform the bandwidth estimator that a packet was lost. * + * All arrival and loss reports based on a single feedback message should have the + * same [now] value. [feedbackComplete] should be called once all feedback reports + * based on a single feedback message have been processed. + * * @param[now] The current time, when this function is called. - * @param[sendTime] The time the packet was sent, if known, or null. - * @param[seq] A 16-bit sequence number of packets processed by this - * [BandwidthEstimator]. + * @param[stats] [PacketStats] about this packet that was sent. */ - fun processPacketLoss(now: Instant, sendTime: Instant?, seq: Int) { + fun processPacketLoss(now: Instant, stats: PacketStats) { if (timeSeriesLogger.isTraceEnabled) { val point = diagnosticContext.makeTimeSeriesPoint("bwe_packet_loss", now) - if (sendTime != null) { - point.addField("sendTime", sendTime.formatMilli()) - } - point.addField("seq", seq) + stats.addToTimeSeriesPoint(point) timeSeriesLogger.trace(point) } - doProcessPacketLoss(now, sendTime, seq) + doProcessPacketLoss(now, stats) } /** @@ -148,7 +165,15 @@ abstract class BandwidthEstimator( * * See that function for parameter details. */ - protected abstract fun doProcessPacketLoss(now: Instant, sendTime: Instant?, seq: Int) + protected abstract fun doProcessPacketLoss(now: Instant, stats: PacketStats) + + /** + * Inform the bandwidht estimator that a block of feedback is complete. + * + * @param[now] The current time, when this function is called. This should match + * the value passed to [processPacketArrival] and [processPacketLoss]. + */ + abstract fun feedbackComplete(now: Instant) /** * Inform the bandwidth estimator about a new round-trip time value @@ -239,6 +264,45 @@ abstract class BandwidthEstimator( listeners.remove(listener) } + /** + * Information about a sent packet, as reported to a [BandwidthEstimator]. + */ + data class PacketStats( + /** + * A 16-bit sequence number of packets processed by this + * [BandwidthEstimator]. + */ + val seq: Int, + /** + * The size of the packet. + */ + val size: DataSize, + /** + * The time the packet was sent, if known, or null. + */ + val sendTime: Instant? + ) { + /** + * Whether the sent packet was a retransmission + */ + var isRtx: Boolean = false + + /** + * Whether the sent packet was a bandwidth probe + */ + var isProbing: Boolean = false + + fun addToTimeSeriesPoint(point: DiagnosticContext.TimeSeriesPoint) { + if (sendTime != null) { + point.addField("sendTime", sendTime.formatMilli()) + } + point.addField("seq", seq) + point.addField("size", size.bytes) + point.addField("rtx", isRtx) + point.addField("probing", isProbing) + } + } + /** * Holds a snapshot of stats specific to the bandwidth estimator. */ diff --git a/src/main/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/GoogleCcEstimator.kt b/src/main/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/GoogleCcEstimator.kt index 0eb692c67..84a7a6d19 100644 --- a/src/main/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/GoogleCcEstimator.kt +++ b/src/main/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/GoogleCcEstimator.kt @@ -4,7 +4,6 @@ import java.time.Duration import java.time.Instant import kotlin.properties.Delegates import org.jitsi.nlj.util.Bandwidth -import org.jitsi.nlj.util.DataSize import org.jitsi.nlj.util.bps import org.jitsi.nlj.util.createChildLogger import org.jitsi.nlj.util.kbps @@ -53,21 +52,25 @@ class GoogleCcEstimator(diagnosticContext: DiagnosticContext, parentLogger: Logg it.setMinMaxBitrate(minBw.bps.toInt(), maxBw.bps.toInt()) } - override fun doProcessPacketArrival(now: Instant, sendTime: Instant?, recvTime: Instant?, seq: Int, size: DataSize, ecn: Byte) { - if (sendTime != null && recvTime != null) { + override fun doProcessPacketTransmission(now: Instant, stats: PacketStats) { + /* Do nothing, Google CC doesn't care about packet transmission. */ + } + + override fun doProcessPacketArrival(now: Instant, stats: PacketStats, recvTime: Instant?, recvEcn: Byte) { + if (stats.sendTime != null && recvTime != null) { bitrateEstimatorAbsSendTime.incomingPacketInfo(now.toEpochMilli(), - recvTime.toEpochMilli(), sendTime.toEpochMilli(), size.bytes.toInt()) + recvTime.toEpochMilli(), stats.sendTime.toEpochMilli(), stats.size.bytes.toInt()) } sendSideBandwidthEstimation.updateReceiverEstimate(bitrateEstimatorAbsSendTime.latestEstimate) sendSideBandwidthEstimation.reportPacketArrived(now.toEpochMilli()) - - /* TODO: rate-limit how often we call updateEstimate? */ - sendSideBandwidthEstimation.updateEstimate(now.toEpochMilli()) - reportBandwidthEstimate(now, sendSideBandwidthEstimation.latestEstimate.bps) } - override fun doProcessPacketLoss(now: Instant, sendTime: Instant?, seq: Int) { + override fun doProcessPacketLoss(now: Instant, stats: PacketStats) { sendSideBandwidthEstimation.reportPacketLost(now.toEpochMilli()) + } + + override fun feedbackComplete(now: Instant) { + /* TODO: rate-limit how often we call updateEstimate? */ sendSideBandwidthEstimation.updateEstimate(now.toEpochMilli()) reportBandwidthEstimate(now, sendSideBandwidthEstimation.latestEstimate.bps) } diff --git a/src/main/kotlin/org/jitsi/nlj/transform/node/outgoing/ProbingDataSender.kt b/src/main/kotlin/org/jitsi/nlj/transform/node/outgoing/ProbingDataSender.kt index 88001e710..d24c44f06 100644 --- a/src/main/kotlin/org/jitsi/nlj/transform/node/outgoing/ProbingDataSender.kt +++ b/src/main/kotlin/org/jitsi/nlj/transform/node/outgoing/ProbingDataSender.kt @@ -143,7 +143,9 @@ class ProbingDataSender( // encapsulating packets as RTX (with the proper ssrc and payload type) so we // just need to find the packets to retransmit and forward them to the next node // TODO(brian): do we need to clone it here? - packetsToResend.add(PacketInfo(packet.clone())) + val packetInfo = PacketInfo(packet.clone()) + packetInfo.isProbing = true + packetsToResend.add(packetInfo) } } // TODO(brian): we're in a thread context mess here. we'll be sending these out from the bandwidthprobing @@ -173,7 +175,9 @@ class ProbingDataSender( paddingPacket.ssrc = senderSsrc paddingPacket.timestamp = currDummyTimestamp paddingPacket.sequenceNumber = currDummySeqNum - garbageDataSender.processPacket(PacketInfo(paddingPacket)) + val packetInfo = PacketInfo(paddingPacket) + packetInfo.isProbing = true + garbageDataSender.processPacket(packetInfo) currDummySeqNum++ bytesSent += packetLength diff --git a/src/main/kotlin/org/jitsi/nlj/transform/node/outgoing/TccSeqNumTagger.kt b/src/main/kotlin/org/jitsi/nlj/transform/node/outgoing/TccSeqNumTagger.kt index 953e6ddca..6f3a16e8d 100644 --- a/src/main/kotlin/org/jitsi/nlj/transform/node/outgoing/TccSeqNumTagger.kt +++ b/src/main/kotlin/org/jitsi/nlj/transform/node/outgoing/TccSeqNumTagger.kt @@ -16,6 +16,7 @@ package org.jitsi.nlj.transform.node.outgoing import org.jitsi.nlj.PacketInfo +import org.jitsi.nlj.format.RtxPayloadType import org.jitsi.nlj.rtp.RtpExtensionType.TRANSPORT_CC import org.jitsi.nlj.rtp.TransportCcEngine import org.jitsi.nlj.stats.NodeStatsBlock @@ -27,7 +28,7 @@ import org.jitsi.rtp.rtp.header_extensions.TccHeaderExtension class TccSeqNumTagger( private val transportCcEngine: TransportCcEngine? = null, - streamInformationStore: ReadOnlyStreamInformationStore + private val streamInformationStore: ReadOnlyStreamInformationStore ) : TransformerNode("TCC sequence number tagger") { private var currTccSeqNum: Int = 1 private var tccExtensionId: Int? = null @@ -45,7 +46,13 @@ class TccSeqNumTagger( ?: rtpPacket.addHeaderExtension(tccExtId, TccHeaderExtension.DATA_SIZE_BYTES) TccHeaderExtension.setSequenceNumber(ext, currTccSeqNum) - transportCcEngine?.mediaPacketSent(currTccSeqNum, rtpPacket.length.bytes) + val pt = streamInformationStore.rtpPayloadTypes[rtpPacket.payloadType.toByte()] + val isRtx = !packetInfo.isProbing && pt is RtxPayloadType + + transportCcEngine?.mediaPacketSent(currTccSeqNum, + rtpPacket.length.bytes, + isProbing = packetInfo.isProbing, + isRtx = isRtx) currTccSeqNum++ } diff --git a/src/test/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/BandwidthEstimationTest.kt b/src/test/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/BandwidthEstimationTest.kt index dc4cc3292..3fa302bc6 100644 --- a/src/test/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/BandwidthEstimationTest.kt +++ b/src/test/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/BandwidthEstimationTest.kt @@ -177,7 +177,9 @@ class PacketReceiver( assert(packet.sendTime <= now) /* All delay is send -> receive in this simulation, so one-way delay is rtt. */ estimator.onRttUpdate(now, Duration.between(packet.sendTime, now)) - estimator.processPacketArrival(now, packet.sendTime, now, seq, packet.packetSize) + val stats = BandwidthEstimator.PacketStats(seq, packet.packetSize, packet.sendTime) + estimator.processPacketArrival(now, stats, now) + estimator.feedbackComplete(now) seq++ val bw = estimator.getCurrentBw(now) if (timeSeriesLogger.isTraceEnabled) { diff --git a/src/test/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/bwe-timeseries-to-medooze.pl b/src/test/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/bwe-timeseries-to-medooze.pl new file mode 100755 index 000000000..b52a6dc62 --- /dev/null +++ b/src/test/kotlin/org/jitsi/nlj/rtp/bandwidthestimation/bwe-timeseries-to-medooze.pl @@ -0,0 +1,255 @@ +#!/usr/bin/perl -w + +# Parse the bandwidth estimation timeseries log output, and convert +# to CSV in the format expected by https://github.com/medooze/bwe-stats-viewer. +# Timeseries logging should be enabled for the relevant subclass of +# org.jitsi.nlj.rtp.bandwidthestimation.BandwidthEstimator, e.g. +# timeseries.org.jitsi.nlj.rtp.bandwidthestimation.GoogleCcEstimator.level=ALL + +# Output files will be generated for each endpoint, in the current directory, +# named $conf_name-$ep_id-$conf_time.csv + +use strict; +use Math::BigFloat; +use Scalar::Util; +use POSIX qw(strftime); + +my %endpoints; + +sub parse_line($) +{ + my ($line) = @_; + + my %ret; + + if ($line !~ /^{/gc) { + return undef; + } + + while (1) { + if ($line !~ /\G"([A-Za-z0-9_]*)":/gc) { + return undef; + } + my $field = $1; + if ($line =~ /\G"((?:[^\\"]|\\.)*)"/gc) { + $ret{$field} = $1; + } + elsif ($line =~ /\G([^",}]*)/gc) { + $ret{$field} = $1; + } + if ($line !~ /\G,/gc) { + last; + } + } + + if ($line !~ /\G}/gc) { + return undef; + } + + return \%ret; +} + +sub get_field($$) +{ + my ($line, $field) = @_; + if ($line =~ /"$field":"((?:[^\\"]|\\.)*)"[,}]/) { + return $1; + } + elsif ($line =~ /"$field":([^",}]*)/) { + return $1; + } + return undef; +} + +sub get_ep_key($) +{ + my ($line) = @_; + + my $conf_name = $line->{conf_name}; + my $ep_id = $line->{endpoint_id}; + my $conf_time = $line->{conf_creation_time_ms}; + + return undef if (!defined($conf_name) || !defined($ep_id) || !defined($conf_time)); + + my $ep_key = "$conf_name:$conf_time:$ep_id"; + if (!exists($endpoints{$ep_key})) { + $endpoints{$ep_key}{info} = [$conf_name, $conf_time, $ep_id]; + } + + return $ep_key; +} + +# Determine which of two values is "smaller" modulo a modulus. +# (Assume modulus is an even number.) +sub min_modulo($$$) +{ + my ($a, $b, $modulus) = @_; + return $a if !defined($b); + return $b if !defined($a); + + my $delta = ($b - $a) % $modulus; + $delta -= $modulus if ($delta > $modulus/2); + + return $delta < 0 ? $b : $a; +} + +while (<>) { + my ($line) = parse_line($_); + next if !defined($line); + + my $key = get_ep_key($line); + my $series = $line->{series}; + next if (!defined($key) || !defined($series)); + + if ($series eq "bwe_packet_arrival" || + $series eq "bwe_packet_loss" || + $series eq "bwe_rtt" || + $series eq "bwe_estimate") { + push(@{$endpoints{$key}{trace}}, $line); + } +} + +my $bzero = Math::BigFloat->new("0"); +my $b1e3 = Math::BigFloat->new("1e3"); + +# Convert ms to another unit, without loss of precision. +sub ms_to_unit($$) +{ + my ($val, $unit) = @_; + + my $ms; + if (Scalar::Util::blessed($val) && Scalar::Util::blessed($val) eq "Math::BigFloat") + { + $ms = $val; + } + else { + $ms = Math::BigFloat->new($val); + } + return $ms->bmul($unit)->bstr(); +} + +sub us($) +{ + my ($val) = @_; + return ms_to_unit($val, $b1e3); +} + +# Dummy wrapper to make code clearer to read +sub ms($) +{ + my ($val) = @_; + + return $val; +} + +# Descriptions of columns, from https://github.com/medooze/bwe-stats-viewer/blob/master/README.md + +# 0: Feedback timestamp for rtp packet +# 1: Transport wide seq num of rtp packet [Not currently used for stats viewer] [not populated] +# 2: Feedback packet num [Not currently used for stats viewer] [not populated] +# 3: total packet size +# 4: Sent time for packet on sender clock +# 5: Received timestamp for rtp packet pn receiver clock (or 0 if lost) +# 6: Delta time with previous sent rtp packet (0 if lost) [Not currently used for stats viewer] [populated] +# 7: Delta time with previous received timestamp (0 if lost) [Not currently used for stats viewer] [populated] +# 8: Delta sent time - delta received time +# 9: Raw Estimated bitrate [Not currently used for stats viewer] [not populated] +# 10: Target bitrate for probing +# 11: Available bitrate, adjusted bwe estimation reported back to the app (BWE minus RTX allocation based on packet loss) +# 12: rtt +# 13: mark flag of RTP packet [Not currently used for stats viewer] [not populated] +# 14: 1 if packet was a retransmission, 0 otherwise +# 15: 1 if packet was for probing, 0 otherwise +sub print_row($$$$$$$$$$$$$) { + my ($out,$fb_time, $size, $sent_time, $recv_time, $sent_delta, $recv_delta, $delta_delta, $target_bitrate, $avail_bitrate, $rtt, $is_rtx, $is_probing) = @_; + + $fb_time = us($fb_time); + $sent_time = us($sent_time); + $recv_time = us($recv_time); + $sent_delta = us($sent_delta); + $recv_delta = us($recv_delta); + $delta_delta = us($delta_delta); + $rtt = ms($rtt); + + print $out "$fb_time|0|0|$size|$sent_time|$recv_time|$sent_delta|$recv_delta|$delta_delta|0|$target_bitrate|$avail_bitrate|$rtt|0|$is_rtx|$is_probing\n"; +} + +foreach my $ep (sort keys %endpoints) { + my ($conf_name, $conf_time, $ep_id) = @{$endpoints{$ep}{info}}; + + my $conf_time_str; + if ($conf_time > 1e11) { + $conf_time_str = strftime("%F-%T", localtime($conf_time/1e3)); + } + else { + $conf_time_str = $conf_time; + } + + my $out_name = "$conf_name-$ep_id-$conf_time_str.csv"; + + open(my $out, ">", "$conf_name-$ep_id-$conf_time_str.csv") or die("$out_name: $!"); + + my ($last_rtt, $last_bitrate) = (0, 0); + my ($last_sent_time, $last_recv_time); + + print STDERR "Writing $out_name\n"; + + foreach my $line (@{$endpoints{$ep}{trace}}) { + my $series = $line->{series}; + if ($series eq "bwe_rtt") { + $last_rtt = $line->{rtt}; + } + elsif ($series eq "bwe_estimate") { + $last_bitrate = $line->{bw}; + } + elsif ($series eq "bwe_packet_loss") { + print_row($out, + $line->{time}, + $line->{size}, + exists($line->{sendTime}) ? $line->{sendTime} : "0", + 0, + 0, + 0, + 0, + $last_bitrate, + $last_bitrate, + $last_rtt, + $line->{rtx} eq "true" ? 1 : 0, + $line->{probing} eq "true" ? 1 : 0); + } + elsif ($series eq "bwe_packet_arrival") { + my $sendTime = $bzero; + my $recvTime = $bzero; + my $sent_delta = $bzero; + my $recv_delta = $bzero; + my $delta_delta = $bzero; + if (exists($line->{sendTime}) && exists($line->{recvTime})) { + $sendTime = Math::BigFloat->new($line->{sendTime}); + $recvTime = Math::BigFloat->new($line->{recvTime}); + if (defined($last_sent_time) && defined($last_recv_time)) { + $sent_delta = $sendTime - $last_sent_time; + $recv_delta = $recvTime - $last_recv_time; + $delta_delta = $sent_delta - $recv_delta; + } + $last_sent_time = $sendTime->copy(); + $last_recv_time = $recvTime->copy(); + } + + print_row($out, + $line->{time}, + $line->{size}, + $sendTime, + $recvTime, + $sent_delta, + $recv_delta, + $delta_delta, + $last_bitrate, + $last_bitrate, + $last_rtt, + $line->{rtx} eq "true" ? 1 : 0, + $line->{probing} eq "true" ? 1 : 0); + } + } + + close($out) or die("$out_name: $!"); +}