Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added special container to collect loss stats. Added reorder stats #1219

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/apputil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ class SrtStatsJson : public SrtStatsWriter
output << "\"packets\":" << mon.pktRecv << ",";
output << "\"packetsLost\":" << mon.pktRcvLoss << ",";
output << "\"packetsDropped\":" << mon.pktRcvDrop << ",";
output << "\"packetsReordered\":" << mon.pktRcvReorder << ",";
output << "\"packetsRetransmitted\":" << mon.pktRcvRetrans << ",";
output << "\"packetsBelated\":" << mon.pktRcvBelated << ",";
output << "\"packetsFilterExtra\":" << mon.pktRcvFilterExtra << ",";
Expand Down Expand Up @@ -447,7 +448,7 @@ class SrtStatsCols : public SrtStatsWriter
output << "FILTER RX SUPPL: " << setw(11) << mon.pktRcvFilterSupply << " RX LOSS: " << setw(11) << mon.pktRcvFilterLoss << endl;
output << "RATE SENDING: " << setw(11) << mon.mbpsSendRate << " RECEIVING: " << setw(11) << mon.mbpsRecvRate << endl;
output << "BELATED RECEIVED: " << setw(11) << mon.pktRcvBelated << " AVG TIME: " << setw(11) << mon.pktRcvAvgBelatedTime << endl;
output << "REORDER DISTANCE: " << setw(11) << mon.pktReorderDistance << endl;
output << "REORDER DISTANCE: " << setw(11) << mon.pktReorderDistance << " TOLERANCE: " << setw(11) << mon.pktReorderTolerance << " RX PKTS: " << setw(11) << mon.pktRcvReorder << endl;
output << "WINDOW FLOW: " << setw(11) << mon.pktFlowWindow << " CONGESTION: " << setw(11) << mon.pktCongestionWindow << " FLIGHT: " << setw(11) << mon.pktFlightSize << endl;
output << "LINK RTT: " << setw(9) << mon.msRTT << "ms BANDWIDTH: " << setw(7) << mon.mbpsBandwidth << "Mb/s " << endl;
output << "BUFFERLEFT: SND: " << setw(11) << mon.byteAvailSndBuf << " RCV: " << setw(11) << mon.byteAvailRcvBuf << endl;
Expand Down
62 changes: 52 additions & 10 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,8 @@ void CUDT::clearData()
m_stats.traceSndDrop = 0;
m_stats.rcvDropTotal = 0;
m_stats.traceRcvDrop = 0;
m_stats.traceRcvReorder = 0;
m_stats.rcvReorderTotal = 0;

m_stats.m_rcvUndecryptTotal = 0;
m_stats.traceRcvUndecrypt = 0;
Expand Down Expand Up @@ -5655,6 +5657,19 @@ bool CUDT::prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd, CUD
}
}

// This capacity is only a special protection against container swelling.
// It can be potentially changed any time and it's only the maximum capacity
// over which oldest records will be removed forcefully.
//
// Setting to half the size of the buffer because the biggest capacity
// can be achieved when every 2nd packet is lost and every record contains
// a loss information of one sequence. Every case of wider range of loss
// sequence will make it even smaller. The only change that it makes for
// the overflow-deleted records is that they can no longer be potentially
// treated as reorderd and will be treated as definitely lost. Of course,
// the ACK dismissal should still remove them way before then.
m_StatsLoss.capacity = m_iRcvBufSize/2;

try
{
m_pSndBuffer = new CSndBuffer(32, m_iMaxSRTPayloadSize);
Expand Down Expand Up @@ -7238,6 +7253,7 @@ void CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)

perf->pktSndDrop = m_stats.traceSndDrop;
perf->pktRcvDrop = m_stats.traceRcvDrop + m_stats.traceRcvUndecrypt;
perf->pktRcvReorder = m_stats.traceRcvReorder;
perf->byteSndDrop = m_stats.traceSndBytesDrop + (m_stats.traceSndDrop * pktHdrSize);
perf->byteRcvDrop =
m_stats.traceRcvBytesDrop + (m_stats.traceRcvDrop * pktHdrSize) + m_stats.traceRcvBytesUndecrypt;
Expand All @@ -7253,6 +7269,7 @@ void CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)
perf->pktRecvACKTotal = m_stats.recvACKTotal;
perf->pktSentNAKTotal = m_stats.sentNAKTotal;
perf->pktRecvNAKTotal = m_stats.recvNAKTotal;
perf->pktRcvReorderTotal = m_stats.rcvReorderTotal;
perf->usSndDurationTotal = m_stats.m_sndDurationTotal;

perf->byteSentTotal = m_stats.bytesSentTotal + (m_stats.sentTotal * pktHdrSize);
Expand Down Expand Up @@ -7379,6 +7396,7 @@ void CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)
{
m_stats.traceSndDrop = 0;
m_stats.traceRcvDrop = 0;
m_stats.traceRcvReorder = 0;
m_stats.traceSndBytesDrop = 0;
m_stats.traceRcvBytesDrop = 0;
m_stats.traceRcvUndecrypt = 0;
Expand Down Expand Up @@ -8028,6 +8046,7 @@ void CUDT::updateSndLossListOnACK(int32_t ackdata_seqno)
}

const steady_clock::time_point currtime = steady_clock::now();

// record total time used for sending
enterCS(m_StatsLock);
m_stats.sndDuration += count_microseconds(currtime - m_stats.sndDurationCounter);
Expand Down Expand Up @@ -9373,23 +9392,31 @@ int CUDT::processData(CUnit* in_unit)
// >1 - jump over a packet loss (loss = seqdiff-1)
if (diff > 1)
{
CGuard lg(m_StatsLock);
int loss = diff - 1; // loss is all that is above diff == 1
m_stats.traceRcvLoss += loss;
m_stats.rcvLossTotal += loss;
uint64_t lossbytes = loss * m_pRcvBuffer->getRcvAvgPayloadSize();
m_stats.traceRcvBytesLoss += lossbytes;
m_stats.rcvBytesLossTotal += lossbytes;
HLOGC(mglog.Debug,
log << "LOSS STATS: n=" << loss << " SEQ: [" << CSeqNo::incseq(m_iRcvCurrPhySeqNo) << " "
<< CSeqNo::decseq(packet.m_iSeqNo) << "]");
int32_t loss_lo = CSeqNo::incseq(m_iRcvCurrPhySeqNo);
int32_t loss_hi = CSeqNo::decseq(packet.m_iSeqNo);
m_StatsLoss.add(loss_lo, loss_hi);
HLOGC(mglog.Debug, log << "MISSING: n=" << (CSeqNo::seqlen(m_iRcvCurrPhySeqNo, packet.m_iSeqNo)-1)
<< " %(" << loss_lo << "-" << loss_hi << ")");
}

if (diff > 0)
{
// Record if it was further than latest
m_iRcvCurrPhySeqNo = packet.m_iSeqNo;
}
else if (pktrexmitflag == 0)
{
// Old, "not retransmitted" packet, that is, reordered
if (m_StatsLoss.unlose(packet.m_iSeqNo))
{
// Count it only if this belated packet was notified as lost
// (This avoids counting phantom packets as reordered)
HLOGC(mglog.Debug, log << "UNLOST %" << packet.m_iSeqNo);
CGuard statslock(m_StatsLock);
m_stats.rcvReorderTotal++;
m_stats.traceRcvReorder++;
}
}
}

{
Expand Down Expand Up @@ -10526,6 +10553,21 @@ int CUDT::checkACKTimer(const steady_clock::time_point &currtime)
because_decision = BECAUSE_LITEACK;
}

if (because_decision != BECAUSE_NO_REASON)
{
// m_iRcvCurrSeqNo is the sequence number used to send ACK
int loss = m_StatsLoss.dismiss(m_iRcvCurrSeqNo);
if (loss)
{
CGuard lock(m_StatsLock);
m_stats.traceRcvLoss += loss;
m_stats.rcvLossTotal += loss;
uint64_t lossbytes = loss * m_pRcvBuffer->getRcvAvgPayloadSize();
m_stats.traceRcvBytesLoss += lossbytes;
m_stats.rcvBytesLossTotal += lossbytes;
}
}

return because_decision;
}

Expand Down
114 changes: 111 additions & 3 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,115 @@ const size_t GRPD_FIELD_SIZE = sizeof(int32_t);
// For HSv4 legacy handshake
#define SRT_MAX_HSRETRY 10 /* Maximum SRT handshake retry */

enum SeqPairItems
struct StatsLossRecord
{
SEQ_BEGIN = 0, SEQ_END = 1, SEQ_SIZE = 2
std::deque< std::pair<int32_t, int32_t> > record;
int capacity;
int forgotten_lost;

StatsLossRecord(): capacity(16), forgotten_lost(0)
{
}

void add(int32_t lo, int32_t hi)
{
record.push_back(std::make_pair(lo, hi));
if (record.size() > size_t(capacity))
{
forgotten_lost += CSeqNo::seqoff(record.front().first, record.front().second);
record.pop_front();
}
}

// Function called at the time of acknowledging.
// This dismisses all collected missing packets and
// treats them all as lost. Possible very late reordered
// packets past that sequence might be mistakenly treated as lost.
int dismiss(int32_t upto)
{
int count_loss = 0;
size_t i = 0;
for (; i < record.size(); ++i)
{
// If the end-range is earlier than upto,
// dismiss this anyway.
if (CSeqNo::seqcmp(upto, record[i].second) >= 0)
{
count_loss += CSeqNo::seqlen(record[i].first, record[i].second);
continue;
}

if (CSeqNo::seqcmp(upto, record[i].first) < 0)
{
break; // None of these - nor any following.
}

// This record should be removed only partially.
// So, shift the start range to required value that
// should stay, and keep this record.
count_loss += CSeqNo::seqlen(record[i].first, upto);
record[i].first = CSeqNo::incseq(upto);
break;
}

record.erase(record.begin(), record.begin() + i);
count_loss += forgotten_lost;
forgotten_lost = 0;
return count_loss;
}

// This function declares given sequence as not lost.
// It removes it simply from the loss record so that
// dismissal will not count it anymore.
bool unlose(int32_t seq)
{
if (record.empty())
return false;

// Prematurely check against the end to avoid looping
if (CSeqNo::seqcmp(seq, record.back().second) > 0)
return false;

// Find the record
for (size_t i = 0; i < record.size(); ++i)
{
if (CSeqNo::seqcmp(seq, record[i].first) < 0)
break; // Previous record was before it, or not found at all

if (CSeqNo::seqcmp(seq, record[i].second) > 0)
continue; // Not this block, but maybe next

// Found this record. Slice, or split in two.
if (seq == record[i].first)
{
if (seq == record[i].second) // one-seq record
{
record.erase(record.begin()+i);
return true;
}

record[i].first = CSeqNo::incseq(record[i].first);
return true;
}

if (seq == record[i].second)
{
record[i].second = CSeqNo::decseq(record[i].second);
return true;
}

// It's in the middle, so this needs splitting.
int32_t new_begin = CSeqNo::incseq(seq);
int32_t new_end = CSeqNo::decseq(seq);
int32_t old_begin = record[i].first;
record[i].first = new_begin;
record.insert(record.begin()+i, std::make_pair(old_begin, new_end));
return true;
}

return false; // record not found
}

};

// Extended SRT Congestion control class - only an incomplete definition required
Expand Down Expand Up @@ -586,7 +692,6 @@ class CUDTGroup
//typedef StaticBuffer<BufferedMessage, 1000> senderBuffer_t;

private:

// Fields required for SRT_GTYPE_BACKUP groups.
senderBuffer_t m_SenderBuffer;
int32_t m_iSndOldestMsgNo; // oldest position in the sender buffer
Expand Down Expand Up @@ -1407,6 +1512,7 @@ class CUDT
CRcvBuffer* m_pRcvBuffer; //< Receiver buffer
CRcvLossList* m_pRcvLossList; //< Receiver loss list
std::deque<CRcvFreshLoss> m_FreshLoss; //< Lost sequence already added to m_pRcvLossList, but not yet sent UMSG_LOSSREPORT for.
StatsLossRecord m_StatsLoss;
int m_iReorderTolerance; //< Current value of dynamic reorder tolerance
int m_iMaxReorderTolerance; //< Maximum allowed value for dynamic reorder tolerance
int m_iConsecEarlyDelivery; //< Increases with every OOO packet that came <TTL-2 time, resets with every increased reorder tolerance
Expand Down Expand Up @@ -1542,6 +1648,7 @@ class CUDT
int recvNAKTotal; // total number of received NAK packets
int sndDropTotal;
int rcvDropTotal;
int rcvReorderTotal;
uint64_t bytesSentTotal; // total number of bytes sent, including retransmissions
uint64_t bytesRecvTotal; // total number of received bytes
uint64_t rcvBytesLossTotal; // total number of loss bytes (estimate)
Expand Down Expand Up @@ -1571,6 +1678,7 @@ class CUDT
int traceSndDrop;
int traceRcvDrop;
int traceRcvRetrans;
int traceRcvReorder;
int traceReorderDistance;
double traceBelatedTime;
int64_t traceRcvBelated;
Expand Down
2 changes: 2 additions & 0 deletions srtcore/srt.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ struct CBytePerfMon
int pktRecvACKTotal; // total number of received ACK packets
int pktSentNAKTotal; // total number of sent NAK packets
int pktRecvNAKTotal; // total number of received NAK packets
int pktRcvReorderTotal; // total number of packets received in different order than sent
int64_t usSndDurationTotal; // total time duration when UDT is sending data (idle time exclusive)
//>new
int pktSndDropTotal; // number of too-late-to-send dropped packets
Expand Down Expand Up @@ -369,6 +370,7 @@ struct CBytePerfMon
int pktSndDrop; // number of too-late-to-send dropped packets
int pktRcvDrop; // number of too-late-to play missing packets
int pktRcvUndecrypt; // number of undecrypted packets
int pktRcvReorder; // number of packets received in different order than sent
uint64_t byteSent; // number of sent data bytes, including retransmissions
uint64_t byteRecv; // number of received bytes
#ifdef SRT_ENABLE_LOSTBYTESCOUNT
Expand Down
Loading