Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] New moving average value for sent/recv rates over last second #3009

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions srtcore/buffer_tools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,5 +273,65 @@ int CSndRateEstimator::incSampleIdx(int val, int inc) const
return val;
}

CMovingRateEstimator::CMovingRateEstimator()
: m_tsFirstSampleTime(sync::steady_clock::now())
, m_iCurSampleIdx(0)
, m_iRateBps(0)
, m_Samples(NUM_PERIODS)
{
resetRate(0, NUM_PERIODS);
}

void CMovingRateEstimator::addSample(int pkts, double bytes)
{
const time_point now = steady_clock::now();
const int iSampleDeltaIdx = int(count_milliseconds(now - m_tsLastSlotTimestamp) / SAMPLE_DURATION_MS);

if (iSampleDeltaIdx == 0)
{
m_Samples[m_iCurSampleIdx].m_iBytesCount += bytes;
m_Samples[m_iCurSampleIdx].m_iPktsCount += pkts;
}
else
{
if ((m_iCurSampleIdx + iSampleDeltaIdx) < NUM_PERIODS)
resetRate(m_iCurSampleIdx + 1, m_iCurSampleIdx + iSampleDeltaIdx);
else
{
int loopbackDiff = m_iCurSampleIdx + iSampleDeltaIdx - NUM_PERIODS;
resetRate(m_iCurSampleIdx + 1, NUM_PERIODS);
resetRate(0, loopbackDiff);
}

m_iCurSampleIdx = ((m_iCurSampleIdx + iSampleDeltaIdx) % NUM_PERIODS);
m_Samples[m_iCurSampleIdx].m_iBytesCount = bytes;
m_Samples[m_iCurSampleIdx].m_iPktsCount = pkts;

m_tsLastSlotTimestamp += milliseconds_from(SAMPLE_DURATION_MS * iSampleDeltaIdx);

computeAverageValue();
}
}

void CMovingRateEstimator::resetRate(int from, int to)
{
for (int i = max(0, from); i < min(int(NUM_PERIODS), to); i++)
m_Samples[i].reset();
}

void CMovingRateEstimator::computeAverageValue()
{
const time_point now = steady_clock::now();
const int startDelta = count_milliseconds(now - m_tsFirstSampleTime);
const bool isFirstPeriod = startDelta < (SAMPLE_DURATION_MS * NUM_PERIODS);
int newRateBps = 0;

for (int i = 0; i < NUM_PERIODS; i++)
newRateBps += (m_Samples[i].m_iBytesCount + (CPacket::HDR_SIZE * m_Samples[i].m_iPktsCount));

if (isFirstPeriod)
newRateBps = newRateBps * SAMPLE_DURATION_MS * NUM_PERIODS / max(1, startDelta);

m_iRateBps = newRateBps;
}
} // namespace srt
81 changes: 79 additions & 2 deletions srtcore/buffer_tools.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,87 @@ class CSndRateEstimator

Sample m_Samples[NUM_PERIODS];

time_point m_tsFirstSampleTime; //< Start time of the first sample.
time_point m_tsFirstSampleTime; //< Start time of the first sameple.
int m_iFirstSampleIdx; //< Index of the first sample.
int m_iCurSampleIdx; //< Index of the current sample being collected.
int m_iRateBps; //< Rate in Bytes/sec.
int m_iRateBps; // Input Rate in Bytes/sec
};

class CMovingRateEstimator
{
typedef sync::steady_clock::time_point time_point;

public:
CMovingRateEstimator();

/// Add sample.
/// @param [in] pkts number of packets in the sample.
/// @param [in] bytes number of payload bytes in the sample.
void addSample(int pkts = 0, double bytes = 0);

/// Clean the mobile measures table to reset average value.
void resetRate() { resetRate(0, NUM_PERIODS); };

/// Retrieve estimated bitrate in bytes per second with 16-byte packet header.
int getRate() const { return m_iRateBps; }

private:
// We would like responsiveness (accuracy) of rate estimation higher than 100 ms
// (ideally around 50 ms) for network adaptive algorithms.
static const int NUM_PERIODS = 100; // To get 1s of values
static const int SAMPLE_DURATION_MS = 10; // 10 ms
time_point m_tsFirstSampleTime; //< Start time of the first sample.
time_point m_tsLastSlotTimestamp; // Used to compute the delta between 2 calls
int m_iCurSampleIdx; //< Index of the current sample being collected.
int m_iRateBps; //< Rate in Bytes/sec.

struct Sample
{
int m_iPktsCount; // number of payload packets
int m_iBytesCount; // number of payload bytes

void reset()
{
m_iPktsCount = 0;
m_iBytesCount = 0;
}

Sample()
: m_iPktsCount(0)
, m_iBytesCount(0)
{
}

Sample(int iPkts, int iBytes)
: m_iPktsCount(iPkts)
, m_iBytesCount(iBytes)
{
}

Sample operator+(const Sample& other)
{
return Sample(m_iPktsCount + other.m_iPktsCount, m_iBytesCount + other.m_iBytesCount);
}

Sample& operator+=(const Sample& other)
{
*this = *this + other;
return *this;
}

bool empty() const { return m_iPktsCount == 0; }
};

srt::FixedArray<Sample> m_Samples; // Table of stored data

/// This method will compute the average value based on all table's measures and the period
/// (NUM_PERIODS*SAMPLE_DURATION_MS)
void computeAverageValue();

/// Reset a part of the stored measures
/// @param from The beginning where the reset have to be applied
/// @param to The last data that have to be reset
void resetRate(int from, int to);
};

} // namespace srt
Expand Down
13 changes: 10 additions & 3 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7569,10 +7569,15 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)
perf->pktRcvUndecryptTotal = m_stats.rcvr.undecrypted.total.count();
perf->byteRcvUndecryptTotal = m_stats.rcvr.undecrypted.total.bytes();


// Average values management
// We are updating rate with 0 Byte 0 packet to ensure an up to date compute in case we are not sending packet for a while.
m_stats.sndr.updateRate(0, 0);
m_stats.rcvr.updateRate(0, 0);
perf->mbpsSendRate = Bps2Mbps(m_stats.sndr.getAverageValue());
perf->mbpsRecvRate = Bps2Mbps(m_stats.rcvr.getAverageValue());

// TODO: The following class members must be protected with a different mutex, not the m_StatsLock.
const double interval = (double) count_microseconds(currtime - m_stats.tsLastSampleTime);
perf->mbpsSendRate = double(perf->byteSent) * 8.0 / interval;
perf->mbpsRecvRate = double(perf->byteRecv) * 8.0 / interval;
perf->usPktSndPeriod = (double) count_microseconds(m_tdSendInterval.load());
perf->pktFlowWindow = m_iFlowWindowSize.load();
perf->pktCongestionWindow = m_iCongestionWindow;
Expand Down Expand Up @@ -9725,6 +9730,7 @@ bool srt::CUDT::packData(CPacket& w_packet, steady_clock::time_point& w_nexttime
m_stats.sndr.sent.count(payload);
if (new_packet_packed)
m_stats.sndr.sentUnique.count(payload);
m_stats.sndr.updateRate(1, payload);
leaveCS(m_StatsLock);

const duration sendint = m_tdSendInterval;
Expand Down Expand Up @@ -10401,6 +10407,7 @@ int srt::CUDT::processData(CUnit* in_unit)

enterCS(m_StatsLock);
m_stats.rcvr.recvd.count(pktsz);
m_stats.rcvr.updateRate(1, pktsz);
leaveCS(m_StatsLock);

loss_seqs_t filter_loss_seqs;
Expand Down
17 changes: 17 additions & 0 deletions srtcore/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "platform_sys.h"
#include "packet.h"
#include "buffer_tools.h"

namespace srt
{
Expand Down Expand Up @@ -144,6 +145,8 @@ struct Sender
Metric<Packets> recvdAck; // The number of ACK packets received by the sender.
Metric<Packets> recvdNak; // The number of ACK packets received by the sender.

CMovingRateEstimator mavgRateEstimator; // The average Mbps over last second

void reset()
{
sent.reset();
Expand All @@ -167,6 +170,12 @@ struct Sender
recvdNak.resetTrace();
sentFilterExtra.resetTrace();
}

void updateRate(int pkts, double bytes) { mavgRateEstimator.addSample(pkts, bytes); }

void resetRate() { mavgRateEstimator.resetRate(); }

int getAverageValue() { return mavgRateEstimator.getRate(); }
};

/// Receiver-side statistics.
Expand All @@ -187,6 +196,8 @@ struct Receiver
Metric<Packets> sentAck; // The number of ACK packets sent by the receiver.
Metric<Packets> sentNak; // The number of NACK packets sent by the receiver.

CMovingRateEstimator mavgRateEstimator; // The average Mbps over last second

void reset()
{
recvd.reset();
Expand Down Expand Up @@ -218,6 +229,12 @@ struct Receiver
sentAck.resetTrace();
sentNak.resetTrace();
}

void updateRate(int pkts, double bytes) { mavgRateEstimator.addSample(pkts, bytes); }

void resetRate() { mavgRateEstimator.resetRate(); }

int getAverageValue() { return mavgRateEstimator.getRate(); }
};

} // namespace stats
Expand Down
Loading