Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] New moving average value for sent/recv rates over last second #3009

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion srtcore/buffer_tools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,10 @@ void CRateEstimator::updateInputRate(const time_point& time, int pkts, int bytes

CSndRateEstimator::CSndRateEstimator(const time_point& tsNow)
: m_tsFirstSampleTime(tsNow)
, m_iFirstSampleIdx(0)
, m_iCurSampleIdx(0)
, m_iRateBps(0)
, m_Samples(NUM_PERIODS)
, m_iFirstSampleIdx(0)
{

}
Expand Down Expand Up @@ -273,5 +274,64 @@ int CSndRateEstimator::incSampleIdx(int val, int inc) const
return val;
}

CMovingRateEstimator::CMovingRateEstimator()
: m_tsFirstSampleTime(sync::steady_clock::now())
, m_iFirstSampleIdx(0)
, m_iCurSampleIdx(0)
, m_iRateBps(0)
, m_Samples(NUM_PERIODS)
{
resetRate(0, NUM_PERIODS);
}

void CMovingRateEstimator::addSample(int pkts, double bytes)
{
const time_point now = steady_clock::now();
const int iSampleDeltaIdx = (int)count_milliseconds(now - lastSlotTimestamp) / SAMPLE_DURATION_MS;

if (iSampleDeltaIdx == 0)
{
m_Samples[m_iCurSampleIdx].m_iBytesCount += bytes;
m_Samples[m_iCurSampleIdx].m_iPktsCount += pkts;
}
else
{
if ((m_iCurSampleIdx + iSampleDeltaIdx) < NUM_PERIODS)
resetRate(m_iCurSampleIdx + 1, m_iCurSampleIdx + iSampleDeltaIdx);
else
{
int loopbackDiff = m_iCurSampleIdx + iSampleDeltaIdx - NUM_PERIODS;
resetRate(m_iCurSampleIdx + 1, NUM_PERIODS);
resetRate(0, loopbackDiff);
}

m_iCurSampleIdx = ((m_iCurSampleIdx + iSampleDeltaIdx) % NUM_PERIODS);
m_Samples[m_iCurSampleIdx].m_iBytesCount = bytes;
m_Samples[m_iCurSampleIdx].m_iPktsCount = pkts;

lastSlotTimestamp = now;
}

computeAverageValue();
}

void CMovingRateEstimator::resetRate(int from, int to)
{
for (int i = max(0, from); i < min(int(NUM_PERIODS), to); i++)
m_Samples[i].reset();
}

void CMovingRateEstimator::computeAverageValue()
{
const time_point now = steady_clock::now();
const int startDelta = count_milliseconds(now - m_tsFirstSampleTime);
const bool isFirstPeriod = startDelta < 1000;
m_iRateBps = 0;

for (int i = 0; i < NUM_PERIODS; i++)
m_iRateBps += (m_Samples[i].m_iBytesCount + (CPacket::HDR_SIZE * m_Samples[i].m_iPktsCount));

if (isFirstPeriod)
m_iRateBps = m_iRateBps * 1000 / startDelta;
}
} // namespace srt
92 changes: 86 additions & 6 deletions srtcore/buffer_tools.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,87 @@ class CSndRateEstimator
/// including the current sampling interval.
int getCurrentRate() const;

protected:
time_point m_tsFirstSampleTime; //< Start time of the first sample.
int m_iCurSampleIdx; //< Index of the current sample being collected.
int m_iRateBps; //< Rate in Bytes/sec.

struct Sample
{
int m_iPktsCount; // number of payload packets
int m_iBytesCount; // number of payload bytes

void reset()
{
m_iPktsCount = 0;
m_iBytesCount = 0;
}

Sample()
: m_iPktsCount(0)
, m_iBytesCount(0)
{
}

Sample(int iPkts, int iBytes)
: m_iPktsCount(iPkts)
, m_iBytesCount(iBytes)
{
}

Sample operator+(const Sample& other)
{
return Sample(m_iPktsCount + other.m_iPktsCount, m_iBytesCount + other.m_iBytesCount);
}

Sample& operator+=(const Sample& other)
{
*this = *this + other;
return *this;
}

bool empty() const { return m_iPktsCount == 0; }
};

srt::FixedArray<Sample> m_Samples; // Table of stored data

private:
static const int NUM_PERIODS = 10;
static const int SAMPLE_DURATION_MS = 100; // 100 ms
int m_iFirstSampleIdx; //< Index of the first sample.

int incSampleIdx(int val, int inc = 1) const;
};

class CMovingRateEstimator
{
typedef sync::steady_clock::time_point time_point;

public:
CMovingRateEstimator();

/// Add sample.
/// @param [in] pkts number of packets in the sample.
/// @param [in] bytes number of payload bytes in the sample.
void addSample(int pkts = 0, double bytes = 0);

/// Clean the mobile measures table to reset average value.
void resetRate() { resetRate(0, NUM_PERIODS); };

/// Retrieve estimated bitrate in bytes per second with 16-byte packet header.
int getRate() const { return m_iRateBps; }

private:
// We would like responsiveness (accuracy) of rate estimation higher than 100 ms
// (ideally around 50 ms) for network adaptive algorithms.
const int NUM_PERIODS = 100; // To get 1s of values
const int SAMPLE_DURATION_MS = 10; // 10 ms
time_point m_tsFirstSampleTime; //< Start time of the first sample.
time_point lastSlotTimestamp; // Used to compute the delta between 2 calls
int m_iFirstSampleIdx; //< Index of the first sample.
int m_iCurSampleIdx; //< Index of the current sample being collected.
int m_iRateBps; //< Rate in Bytes/sec.

struct Sample
{
int m_iPktsCount; // number of payload packets
Expand Down Expand Up @@ -188,14 +266,16 @@ class CSndRateEstimator
bool empty() const { return m_iPktsCount == 0; }
};

int incSampleIdx(int val, int inc = 1) const;
srt::FixedArray<Sample> m_Samples; // Table of stored data

Sample m_Samples[NUM_PERIODS];
/// This method will compute the average value based on all table's measures and the period
/// (NUM_PERIODS*SAMPLE_DURATION_MS)
void computeAverageValue();

time_point m_tsFirstSampleTime; //< Start time of the first sample.
int m_iFirstSampleIdx; //< Index of the first sample.
int m_iCurSampleIdx; //< Index of the current sample being collected.
int m_iRateBps; //< Rate in Bytes/sec.
/// Reset a part of the stored measures
/// @param from The beginning where the reset have to be applied
/// @param to The last data that have to be reset
void resetRate(int from, int to);
};

} // namespace srt
Expand Down
13 changes: 10 additions & 3 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7569,10 +7569,15 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)
perf->pktRcvUndecryptTotal = m_stats.rcvr.undecrypted.total.count();
perf->byteRcvUndecryptTotal = m_stats.rcvr.undecrypted.total.bytes();


// Average values management
// We are updating rate with 0 Byte 0 packet to ensure an up to date compute in case we are not sending packet for a while.
m_stats.sndr.updateRate(0, 0);
m_stats.rcvr.updateRate(0, 0);
perf->mbpsSendRate = Bps2Mbps(m_stats.sndr.getAverageValue());
perf->mbpsRecvRate = Bps2Mbps(m_stats.rcvr.getAverageValue());

// TODO: The following class members must be protected with a different mutex, not the m_StatsLock.
const double interval = (double) count_microseconds(currtime - m_stats.tsLastSampleTime);
perf->mbpsSendRate = double(perf->byteSent) * 8.0 / interval;
perf->mbpsRecvRate = double(perf->byteRecv) * 8.0 / interval;
perf->usPktSndPeriod = (double) count_microseconds(m_tdSendInterval.load());
perf->pktFlowWindow = m_iFlowWindowSize.load();
perf->pktCongestionWindow = m_iCongestionWindow;
Expand Down Expand Up @@ -9725,6 +9730,7 @@ bool srt::CUDT::packData(CPacket& w_packet, steady_clock::time_point& w_nexttime
m_stats.sndr.sent.count(payload);
if (new_packet_packed)
m_stats.sndr.sentUnique.count(payload);
m_stats.sndr.updateRate(1, payload);
leaveCS(m_StatsLock);

const duration sendint = m_tdSendInterval;
Expand Down Expand Up @@ -10401,6 +10407,7 @@ int srt::CUDT::processData(CUnit* in_unit)

enterCS(m_StatsLock);
m_stats.rcvr.recvd.count(pktsz);
m_stats.rcvr.updateRate(1, pktsz);
leaveCS(m_StatsLock);

loss_seqs_t filter_loss_seqs;
Expand Down
17 changes: 17 additions & 0 deletions srtcore/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "platform_sys.h"
#include "packet.h"
#include "buffer_tools.h"

namespace srt
{
Expand Down Expand Up @@ -144,6 +145,8 @@ struct Sender
Metric<Packets> recvdAck; // The number of ACK packets received by the sender.
Metric<Packets> recvdNak; // The number of ACK packets received by the sender.

CMovingRateEstimator mobileRateEstimator; // The average Mbps over last second

void reset()
{
sent.reset();
Expand All @@ -167,6 +170,12 @@ struct Sender
recvdNak.resetTrace();
sentFilterExtra.resetTrace();
}

void updateRate(int pkts, double bytes) { mobileRateEstimator.addSample(pkts, bytes); }

void resetRate() { mobileRateEstimator.resetRate(); }

int getAverageValue() { return mobileRateEstimator.getRate(); }
};

/// Receiver-side statistics.
Expand All @@ -187,6 +196,8 @@ struct Receiver
Metric<Packets> sentAck; // The number of ACK packets sent by the receiver.
Metric<Packets> sentNak; // The number of NACK packets sent by the receiver.

CMovingRateEstimator mobileRateEstimator; // The average Mbps over last second

void reset()
{
recvd.reset();
Expand Down Expand Up @@ -218,6 +229,12 @@ struct Receiver
sentAck.resetTrace();
sentNak.resetTrace();
}

void updateRate(int pkts, double bytes) { mobileRateEstimator.addSample(pkts, bytes); }

void resetRate() { mobileRateEstimator.resetRate(); }

int getAverageValue() { return mobileRateEstimator.getRate(); }
};

} // namespace stats
Expand Down
Loading