Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] New moving average value for sent/recv rates over last second #3009

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions srtcore/buffer_tools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,5 +273,66 @@ int CSndRateEstimator::incSampleIdx(int val, int inc) const
return val;
}

CMovingRateEstimator::CMovingRateEstimator()
: m_tsFirstSampleTime(sync::steady_clock::now())
, m_iFirstSampleIdx(0)
, m_iCurSampleIdx(0)
, m_iRateBps(0)
, m_Samples(NUM_PERIODS)
{
resetRate(0, NUM_PERIODS);
}

void CMovingRateEstimator::addSample(int pkts, double bytes)
{
const time_point now = steady_clock::now();
const int iSampleDeltaIdx = int(count_milliseconds(now - m_tsLastSlotTimestamp) / SAMPLE_DURATION_MS);

if (iSampleDeltaIdx == 0)
{
m_Samples[m_iCurSampleIdx].m_iBytesCount += bytes;
m_Samples[m_iCurSampleIdx].m_iPktsCount += pkts;
}
else
{
if ((m_iCurSampleIdx + iSampleDeltaIdx) < NUM_PERIODS)
resetRate(m_iCurSampleIdx + 1, m_iCurSampleIdx + iSampleDeltaIdx);
else
{
int loopbackDiff = m_iCurSampleIdx + iSampleDeltaIdx - NUM_PERIODS;
resetRate(m_iCurSampleIdx + 1, NUM_PERIODS);
resetRate(0, loopbackDiff);
}

m_iCurSampleIdx = ((m_iCurSampleIdx + iSampleDeltaIdx) % NUM_PERIODS);
m_Samples[m_iCurSampleIdx].m_iBytesCount = bytes;
m_Samples[m_iCurSampleIdx].m_iPktsCount = pkts;

m_tsLastSlotTimestamp += milliseconds_from(SAMPLE_DURATION_MS * iSampleDeltaIdx);

computeAverageValue();
}
}

void CMovingRateEstimator::resetRate(int from, int to)
{
for (int i = max(0, from); i < min(int(NUM_PERIODS), to); i++)
m_Samples[i].reset();
}

void CMovingRateEstimator::computeAverageValue()
{
const time_point now = steady_clock::now();
const int startDelta = count_milliseconds(now - m_tsFirstSampleTime);
const bool isFirstPeriod = startDelta < (SAMPLE_DURATION_MS * NUM_PERIODS);
int newRateBps = 0;

for (int i = 0; i < NUM_PERIODS; i++)
newRateBps += (m_Samples[i].m_iBytesCount + (CPacket::HDR_SIZE * m_Samples[i].m_iPktsCount));

if (isFirstPeriod)
newRateBps = newRateBps * 1000 / startDelta;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!!! Division by zero not prevented!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is :) We don't go in the computeAverageValue method if iSampleDeltaIdx == 0.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ RUN      ] TestSocketOptions.LossMaxTTL
/home/travis/.travis/functions: line 109:  7133 Floating point exception(core dumped) ./test-srt -disable-ipv6
[New LWP 7133]
[New LWP 7746]
[New LWP 7749]
[New LWP 7750]
[New LWP 7745]
[New LWP 7747]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
Core was generated by `./test-srt -disable-ipv6'.
Program terminated with signal SIGFPE, Arithmetic exception.
#0  0x000000000087cafc in srt::CMovingRateEstimator::computeAverageValue (this=0x7fe6bc005cb8) at /home/travis/build/Haivision/srt/srtcore/buffer_tools.cpp:334
334	        newRateBps = newRateBps * 1000 / startDelta;
[Current thread is 1 (Thread 0x7fe6c9ee9740 (LWP 7133))]
#0  0x000000000087cafc in srt::CMovingRateEstimator::computeAverageValue (this=0x7fe6bc005cb8) at /home/travis/build/Haivision/srt/srtcore/buffer_tools.cpp:334
#1  0x000000000087c94d in srt::CMovingRateEstimator::addSample (this=0x7fe6bc005cb8, pkts=0, bytes=0) at /home/travis/build/Haivision/srt/srtcore/buffer_tools.cpp:313
#2  0x00000000008d80bb in srt::stats::Sender::updateRate (this=0x7fe6bc005c18, pkts=0, bytes=0) at /home/travis/build/Haivision/srt/srtcore/stats.h:174
#3  0x00000000008b934a in srt::CUDT::bstats (this=0x7fe6bc000938, perf=0x7ffd64377750, clear=false, instantaneous=false) at /home/travis/build/Haivision/srt/srtcore/core.cpp:7577
#4  0x000000000085bfe3 in srt::CUDT::bstats (u=412356694, perf=0x7ffd64377750, clear=false, instantaneous=false) at /home/travis/build/Haivision/srt/srtcore/api.cpp:4338

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is :) We don't go in the computeAverageValue method if iSampleDeltaIdx == 0.

The division by 0 is made using the value that is calculated out of the present time taken exactly in this function, so there's even no possibility that you could control this value this way.


m_iRateBps = newRateBps;
}
} // namespace srt
82 changes: 80 additions & 2 deletions srtcore/buffer_tools.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,88 @@ class CSndRateEstimator

Sample m_Samples[NUM_PERIODS];

time_point m_tsFirstSampleTime; //< Start time of the first sample.
time_point m_tsFirstSampleTime; //< Start time of the first sameple.
int m_iFirstSampleIdx; //< Index of the first sample.
int m_iCurSampleIdx; //< Index of the current sample being collected.
int m_iRateBps; //< Rate in Bytes/sec.
int m_iRateBps; // Input Rate in Bytes/sec
};

class CMovingRateEstimator
{
typedef sync::steady_clock::time_point time_point;

public:
CMovingRateEstimator();

/// Add sample.
/// @param [in] pkts number of packets in the sample.
/// @param [in] bytes number of payload bytes in the sample.
void addSample(int pkts = 0, double bytes = 0);

/// Clean the mobile measures table to reset average value.
void resetRate() { resetRate(0, NUM_PERIODS); };

/// Retrieve estimated bitrate in bytes per second with 16-byte packet header.
int getRate() const { return m_iRateBps; }

private:
// We would like responsiveness (accuracy) of rate estimation higher than 100 ms
// (ideally around 50 ms) for network adaptive algorithms.
static const int NUM_PERIODS = 100; // To get 1s of values
static const int SAMPLE_DURATION_MS = 10; // 10 ms
time_point m_tsFirstSampleTime; //< Start time of the first sample.
time_point m_tsLastSlotTimestamp; // Used to compute the delta between 2 calls
int m_iFirstSampleIdx; //< Index of the first sample.
int m_iCurSampleIdx; //< Index of the current sample being collected.
int m_iRateBps; //< Rate in Bytes/sec.

struct Sample
{
int m_iPktsCount; // number of payload packets
int m_iBytesCount; // number of payload bytes

void reset()
{
m_iPktsCount = 0;
m_iBytesCount = 0;
}

Sample()
: m_iPktsCount(0)
, m_iBytesCount(0)
{
}

Sample(int iPkts, int iBytes)
: m_iPktsCount(iPkts)
, m_iBytesCount(iBytes)
{
}

Sample operator+(const Sample& other)
{
return Sample(m_iPktsCount + other.m_iPktsCount, m_iBytesCount + other.m_iBytesCount);
}

Sample& operator+=(const Sample& other)
{
*this = *this + other;
return *this;
}

bool empty() const { return m_iPktsCount == 0; }
};

srt::FixedArray<Sample> m_Samples; // Table of stored data

/// This method will compute the average value based on all table's measures and the period
/// (NUM_PERIODS*SAMPLE_DURATION_MS)
void computeAverageValue();

/// Reset a part of the stored measures
/// @param from The beginning where the reset have to be applied
/// @param to The last data that have to be reset
void resetRate(int from, int to);
};

} // namespace srt
Expand Down
13 changes: 10 additions & 3 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7569,10 +7569,15 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)
perf->pktRcvUndecryptTotal = m_stats.rcvr.undecrypted.total.count();
perf->byteRcvUndecryptTotal = m_stats.rcvr.undecrypted.total.bytes();


// Average values management
// We are updating rate with 0 Byte 0 packet to ensure an up to date compute in case we are not sending packet for a while.
m_stats.sndr.updateRate(0, 0);
m_stats.rcvr.updateRate(0, 0);
perf->mbpsSendRate = Bps2Mbps(m_stats.sndr.getAverageValue());
perf->mbpsRecvRate = Bps2Mbps(m_stats.rcvr.getAverageValue());

// TODO: The following class members must be protected with a different mutex, not the m_StatsLock.
const double interval = (double) count_microseconds(currtime - m_stats.tsLastSampleTime);
perf->mbpsSendRate = double(perf->byteSent) * 8.0 / interval;
perf->mbpsRecvRate = double(perf->byteRecv) * 8.0 / interval;
perf->usPktSndPeriod = (double) count_microseconds(m_tdSendInterval.load());
perf->pktFlowWindow = m_iFlowWindowSize.load();
perf->pktCongestionWindow = m_iCongestionWindow;
Expand Down Expand Up @@ -9725,6 +9730,7 @@ bool srt::CUDT::packData(CPacket& w_packet, steady_clock::time_point& w_nexttime
m_stats.sndr.sent.count(payload);
if (new_packet_packed)
m_stats.sndr.sentUnique.count(payload);
m_stats.sndr.updateRate(1, payload);
leaveCS(m_StatsLock);

const duration sendint = m_tdSendInterval;
Expand Down Expand Up @@ -10401,6 +10407,7 @@ int srt::CUDT::processData(CUnit* in_unit)

enterCS(m_StatsLock);
m_stats.rcvr.recvd.count(pktsz);
m_stats.rcvr.updateRate(1, pktsz);
leaveCS(m_StatsLock);

loss_seqs_t filter_loss_seqs;
Expand Down
17 changes: 17 additions & 0 deletions srtcore/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "platform_sys.h"
#include "packet.h"
#include "buffer_tools.h"

namespace srt
{
Expand Down Expand Up @@ -144,6 +145,8 @@ struct Sender
Metric<Packets> recvdAck; // The number of ACK packets received by the sender.
Metric<Packets> recvdNak; // The number of ACK packets received by the sender.

CMovingRateEstimator mavgRateEstimator; // The average Mbps over last second

void reset()
{
sent.reset();
Expand All @@ -167,6 +170,12 @@ struct Sender
recvdNak.resetTrace();
sentFilterExtra.resetTrace();
}

void updateRate(int pkts, double bytes) { mavgRateEstimator.addSample(pkts, bytes); }

void resetRate() { mavgRateEstimator.resetRate(); }

int getAverageValue() { return mavgRateEstimator.getRate(); }
};

/// Receiver-side statistics.
Expand All @@ -187,6 +196,8 @@ struct Receiver
Metric<Packets> sentAck; // The number of ACK packets sent by the receiver.
Metric<Packets> sentNak; // The number of NACK packets sent by the receiver.

CMovingRateEstimator mavgRateEstimator; // The average Mbps over last second

void reset()
{
recvd.reset();
Expand Down Expand Up @@ -218,6 +229,12 @@ struct Receiver
sentAck.resetTrace();
sentNak.resetTrace();
}

void updateRate(int pkts, double bytes) { mavgRateEstimator.addSample(pkts, bytes); }

void resetRate() { mavgRateEstimator.resetRate(); }

int getAverageValue() { return mavgRateEstimator.getRate(); }
};

} // namespace stats
Expand Down
Loading