diff --git a/srtcore/buffer_tools.cpp b/srtcore/buffer_tools.cpp index 7473e3824..cd6fcf21f 100644 --- a/srtcore/buffer_tools.cpp +++ b/srtcore/buffer_tools.cpp @@ -273,5 +273,65 @@ int CSndRateEstimator::incSampleIdx(int val, int inc) const return val; } +CMovingRateEstimator::CMovingRateEstimator() + : m_tsFirstSampleTime(sync::steady_clock::now()) + , m_iCurSampleIdx(0) + , m_iRateBps(0) + , m_Samples(NUM_PERIODS) +{ + resetRate(0, NUM_PERIODS); } +void CMovingRateEstimator::addSample(int pkts, double bytes) +{ + const time_point now = steady_clock::now(); + const int iSampleDeltaIdx = int(count_milliseconds(now - m_tsLastSlotTimestamp) / SAMPLE_DURATION_MS); + + if (iSampleDeltaIdx == 0) + { + m_Samples[m_iCurSampleIdx].m_iBytesCount += bytes; + m_Samples[m_iCurSampleIdx].m_iPktsCount += pkts; + } + else + { + if ((m_iCurSampleIdx + iSampleDeltaIdx) < NUM_PERIODS) + resetRate(m_iCurSampleIdx + 1, m_iCurSampleIdx + iSampleDeltaIdx); + else + { + int loopbackDiff = m_iCurSampleIdx + iSampleDeltaIdx - NUM_PERIODS; + resetRate(m_iCurSampleIdx + 1, NUM_PERIODS); + resetRate(0, loopbackDiff); + } + + m_iCurSampleIdx = ((m_iCurSampleIdx + iSampleDeltaIdx) % NUM_PERIODS); + m_Samples[m_iCurSampleIdx].m_iBytesCount = bytes; + m_Samples[m_iCurSampleIdx].m_iPktsCount = pkts; + + m_tsLastSlotTimestamp += milliseconds_from(SAMPLE_DURATION_MS * iSampleDeltaIdx); + + computeAverageValue(); + } +} + +void CMovingRateEstimator::resetRate(int from, int to) +{ + for (int i = max(0, from); i < min(int(NUM_PERIODS), to); i++) + m_Samples[i].reset(); +} + +void CMovingRateEstimator::computeAverageValue() +{ + const time_point now = steady_clock::now(); + const int startDelta = count_milliseconds(now - m_tsFirstSampleTime); + const bool isFirstPeriod = startDelta < (SAMPLE_DURATION_MS * NUM_PERIODS); + int newRateBps = 0; + + for (int i = 0; i < NUM_PERIODS; i++) + newRateBps += (m_Samples[i].m_iBytesCount + (CPacket::HDR_SIZE * m_Samples[i].m_iPktsCount)); + + if (isFirstPeriod) + newRateBps = newRateBps * SAMPLE_DURATION_MS * NUM_PERIODS / max(1, startDelta); + + m_iRateBps = newRateBps; +} +} // namespace srt diff --git a/srtcore/buffer_tools.h b/srtcore/buffer_tools.h index 88479827d..ebc5e6b79 100644 --- a/srtcore/buffer_tools.h +++ b/srtcore/buffer_tools.h @@ -192,10 +192,87 @@ class CSndRateEstimator Sample m_Samples[NUM_PERIODS]; - time_point m_tsFirstSampleTime; //< Start time of the first sample. + time_point m_tsFirstSampleTime; //< Start time of the first sameple. int m_iFirstSampleIdx; //< Index of the first sample. int m_iCurSampleIdx; //< Index of the current sample being collected. - int m_iRateBps; //< Rate in Bytes/sec. + int m_iRateBps; // Input Rate in Bytes/sec +}; + +class CMovingRateEstimator +{ + typedef sync::steady_clock::time_point time_point; + +public: + CMovingRateEstimator(); + + /// Add sample. + /// @param [in] pkts number of packets in the sample. + /// @param [in] bytes number of payload bytes in the sample. + void addSample(int pkts = 0, double bytes = 0); + + /// Clean the mobile measures table to reset average value. + void resetRate() { resetRate(0, NUM_PERIODS); }; + + /// Retrieve estimated bitrate in bytes per second with 16-byte packet header. + int getRate() const { return m_iRateBps; } + +private: + // We would like responsiveness (accuracy) of rate estimation higher than 100 ms + // (ideally around 50 ms) for network adaptive algorithms. + static const int NUM_PERIODS = 100; // To get 1s of values + static const int SAMPLE_DURATION_MS = 10; // 10 ms + time_point m_tsFirstSampleTime; //< Start time of the first sample. + time_point m_tsLastSlotTimestamp; // Used to compute the delta between 2 calls + int m_iCurSampleIdx; //< Index of the current sample being collected. + int m_iRateBps; //< Rate in Bytes/sec. + + struct Sample + { + int m_iPktsCount; // number of payload packets + int m_iBytesCount; // number of payload bytes + + void reset() + { + m_iPktsCount = 0; + m_iBytesCount = 0; + } + + Sample() + : m_iPktsCount(0) + , m_iBytesCount(0) + { + } + + Sample(int iPkts, int iBytes) + : m_iPktsCount(iPkts) + , m_iBytesCount(iBytes) + { + } + + Sample operator+(const Sample& other) + { + return Sample(m_iPktsCount + other.m_iPktsCount, m_iBytesCount + other.m_iBytesCount); + } + + Sample& operator+=(const Sample& other) + { + *this = *this + other; + return *this; + } + + bool empty() const { return m_iPktsCount == 0; } + }; + + srt::FixedArray m_Samples; // Table of stored data + + /// This method will compute the average value based on all table's measures and the period + /// (NUM_PERIODS*SAMPLE_DURATION_MS) + void computeAverageValue(); + + /// Reset a part of the stored measures + /// @param from The beginning where the reset have to be applied + /// @param to The last data that have to be reset + void resetRate(int from, int to); }; } // namespace srt diff --git a/srtcore/core.cpp b/srtcore/core.cpp index bae245392..f6d38e3a7 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -7569,10 +7569,15 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous) perf->pktRcvUndecryptTotal = m_stats.rcvr.undecrypted.total.count(); perf->byteRcvUndecryptTotal = m_stats.rcvr.undecrypted.total.bytes(); + + // Average values management + // We are updating rate with 0 Byte 0 packet to ensure an up to date compute in case we are not sending packet for a while. + m_stats.sndr.updateRate(0, 0); + m_stats.rcvr.updateRate(0, 0); + perf->mbpsSendRate = Bps2Mbps(m_stats.sndr.getAverageValue()); + perf->mbpsRecvRate = Bps2Mbps(m_stats.rcvr.getAverageValue()); + // TODO: The following class members must be protected with a different mutex, not the m_StatsLock. - const double interval = (double) count_microseconds(currtime - m_stats.tsLastSampleTime); - perf->mbpsSendRate = double(perf->byteSent) * 8.0 / interval; - perf->mbpsRecvRate = double(perf->byteRecv) * 8.0 / interval; perf->usPktSndPeriod = (double) count_microseconds(m_tdSendInterval.load()); perf->pktFlowWindow = m_iFlowWindowSize.load(); perf->pktCongestionWindow = m_iCongestionWindow; @@ -9725,6 +9730,7 @@ bool srt::CUDT::packData(CPacket& w_packet, steady_clock::time_point& w_nexttime m_stats.sndr.sent.count(payload); if (new_packet_packed) m_stats.sndr.sentUnique.count(payload); + m_stats.sndr.updateRate(1, payload); leaveCS(m_StatsLock); const duration sendint = m_tdSendInterval; @@ -10401,6 +10407,7 @@ int srt::CUDT::processData(CUnit* in_unit) enterCS(m_StatsLock); m_stats.rcvr.recvd.count(pktsz); + m_stats.rcvr.updateRate(1, pktsz); leaveCS(m_StatsLock); loss_seqs_t filter_loss_seqs; diff --git a/srtcore/stats.h b/srtcore/stats.h index 947489eb1..4b67378fd 100644 --- a/srtcore/stats.h +++ b/srtcore/stats.h @@ -13,6 +13,7 @@ #include "platform_sys.h" #include "packet.h" +#include "buffer_tools.h" namespace srt { @@ -144,6 +145,8 @@ struct Sender Metric recvdAck; // The number of ACK packets received by the sender. Metric recvdNak; // The number of ACK packets received by the sender. + CMovingRateEstimator mavgRateEstimator; // The average Mbps over last second + void reset() { sent.reset(); @@ -167,6 +170,12 @@ struct Sender recvdNak.resetTrace(); sentFilterExtra.resetTrace(); } + + void updateRate(int pkts, double bytes) { mavgRateEstimator.addSample(pkts, bytes); } + + void resetRate() { mavgRateEstimator.resetRate(); } + + int getAverageValue() { return mavgRateEstimator.getRate(); } }; /// Receiver-side statistics. @@ -187,6 +196,8 @@ struct Receiver Metric sentAck; // The number of ACK packets sent by the receiver. Metric sentNak; // The number of NACK packets sent by the receiver. + CMovingRateEstimator mavgRateEstimator; // The average Mbps over last second + void reset() { recvd.reset(); @@ -218,6 +229,12 @@ struct Receiver sentAck.resetTrace(); sentNak.resetTrace(); } + + void updateRate(int pkts, double bytes) { mavgRateEstimator.addSample(pkts, bytes); } + + void resetRate() { mavgRateEstimator.resetRate(); } + + int getAverageValue() { return mavgRateEstimator.getRate(); } }; } // namespace stats