From f6c231588da17f5bf91f82be7122664d072a93cc Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 22 Apr 2024 10:09:44 +0200 Subject: [PATCH 1/2] [core] Fixed stats counting packets dropped by a group (#2934). Also fixed m_iAvgPayloadSize initialization. --- srtcore/group.cpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 1539245a0..52b31d29f 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -259,6 +259,7 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_uOPT_MinStabilityTimeout_us(1000 * CSrtConfig::COMM_DEF_MIN_STABILITY_TIMEOUT_MS) // -1 = "undefined"; will become defined with first added socket , m_iMaxPayloadSize(-1) + , m_iAvgPayloadSize(-1) , m_bSynRecving(true) , m_bSynSending(true) , m_bTsbPd(true) @@ -2309,6 +2310,19 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) } fillGroupData((w_mc), w_mc); + // TODO: What if a drop happens before the very first packet was read? Maybe set to ISN? + if (m_RcvBaseSeqNo != SRT_SEQNO_NONE) + { + const int32_t iNumDropped = (CSeqNo(w_mc.pktseq) - CSeqNo(m_RcvBaseSeqNo)) - 1; + if (iNumDropped > 0) + { + m_stats.recvDrop.count(stats::BytesPackets(iNumDropped * static_cast(avgRcvPacketSize()), iNumDropped)); + LOGC(grlog.Warn, + log << "@" << m_GroupID << " GROUP RCV-DROPPED " << iNumDropped << " packet(s): seqno %" + << m_RcvBaseSeqNo << " to %" << w_mc.pktseq); + } + } + HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": Update m_RcvBaseSeqNo: %" << m_RcvBaseSeqNo << " -> %" << w_mc.pktseq); m_RcvBaseSeqNo = w_mc.pktseq; From 4f925fbca586c0d39bf80febd9d8315a8092f1f7 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 22 Apr 2024 10:11:25 +0200 Subject: [PATCH 2/2] [core] Stats: do not count discarded packets as dropped (#2932). Valid for a broadcast group member dropping existing packets from the RCV buffer because they were read from another member. Co-authored-by: Sektor van Skijlen --- srtcore/buffer_rcv.cpp | 18 +++++++++++++----- srtcore/buffer_rcv.h | 4 ++-- srtcore/core.cpp | 19 +++++++++++++------ srtcore/core.h | 9 ++++++++- srtcore/group.cpp | 2 +- 5 files changed, 37 insertions(+), 15 deletions(-) diff --git a/srtcore/buffer_rcv.cpp b/srtcore/buffer_rcv.cpp index fb389e4be..2ec42487d 100644 --- a/srtcore/buffer_rcv.cpp +++ b/srtcore/buffer_rcv.cpp @@ -206,7 +206,7 @@ int CRcvBuffer::insert(CUnit* unit) return 0; } -int CRcvBuffer::dropUpTo(int32_t seqno) +std::pair CRcvBuffer::dropUpTo(int32_t seqno) { IF_RCVBUF_DEBUG(ScopedLog scoped_log); IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo); @@ -215,16 +215,23 @@ int CRcvBuffer::dropUpTo(int32_t seqno) if (len <= 0) { IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop."); - return 0; + return std::make_pair(0, 0); } m_iMaxPosOff -= len; if (m_iMaxPosOff < 0) m_iMaxPosOff = 0; - const int iDropCnt = len; + int iNumDropped = 0; // Number of dropped packets that were missing. + int iNumDiscarded = 0; // The number of dropped packets that existed in the buffer. while (len > 0) { + // Note! Dropping a EntryState_Read must not be counted as a drop because it was read. + // Note! Dropping a EntryState_Drop must not be counted as a drop because it was already dropped and counted earlier. + if (m_entries[m_iStartPos].status == EntryState_Avail) + ++iNumDiscarded; + else if (m_entries[m_iStartPos].status == EntryState_Empty) + ++iNumDropped; dropUnitInPos(m_iStartPos); m_entries[m_iStartPos].status = EntryState_Empty; SRT_ASSERT(m_entries[m_iStartPos].pUnit == NULL && m_entries[m_iStartPos].status == EntryState_Empty); @@ -246,7 +253,7 @@ int CRcvBuffer::dropUpTo(int32_t seqno) } if (!m_tsbpd.isEnabled() && m_bMessageAPI) updateFirstReadableOutOfOrder(); - return iDropCnt; + return std::make_pair(iNumDropped, iNumDiscarded); } int CRcvBuffer::dropAll() @@ -255,7 +262,8 @@ int CRcvBuffer::dropAll() return 0; const int end_seqno = CSeqNo::incseq(m_iStartSeqNo, m_iMaxPosOff); - return dropUpTo(end_seqno); + const std::pair numDropped = dropUpTo(end_seqno); + return numDropped.first + numDropped.second; } int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, DropActionIfExists actionOnExisting) diff --git a/srtcore/buffer_rcv.h b/srtcore/buffer_rcv.h index d664373f5..3c60be21d 100644 --- a/srtcore/buffer_rcv.h +++ b/srtcore/buffer_rcv.h @@ -66,8 +66,8 @@ class CRcvBuffer /// Drop packets in the receiver buffer from the current position up to the seqno (excluding seqno). /// @param [in] seqno drop units up to this sequence number - /// @return number of dropped packets. - int dropUpTo(int32_t seqno); + /// @return number of dropped (missing) and discarded (available) packets as a pair(dropped, discarded). + std::pair dropUpTo(int32_t seqno); /// @brief Drop all the packets in the receiver buffer. /// The starting position and seqno are shifted right after the last packet in the buffer. diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 4a8ce550e..6752d7e9f 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -5576,7 +5576,7 @@ void * srt::CUDT::tsbpd(void* param) return NULL; } -int srt::CUDT::rcvDropTooLateUpTo(int seqno) +int srt::CUDT::rcvDropTooLateUpTo(int seqno, DropReason reason) { // Make sure that it would not drop over m_iRcvCurrSeqNo, which may break senders. if (CSeqNo::seqcmp(seqno, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0) @@ -5584,16 +5584,22 @@ int srt::CUDT::rcvDropTooLateUpTo(int seqno) dropFromLossLists(SRT_SEQNO_NONE, CSeqNo::decseq(seqno)); - const int iDropCnt = m_pRcvBuffer->dropUpTo(seqno); - if (iDropCnt > 0) + const std::pair iDropDiscardedPkts = m_pRcvBuffer->dropUpTo(seqno); + const int iDropCnt = iDropDiscardedPkts.first; + const int iDiscardedCnt = iDropDiscardedPkts.second; + const int iDropCntTotal = iDropCnt + iDiscardedCnt; + + // In case of DROP_TOO_LATE discarded packets should also be counted because they are not read from another member socket. + const int iDropStatCnt = (reason == DROP_DISCARD) ? iDropCnt : iDropCntTotal; + if (iDropStatCnt > 0) { enterCS(m_StatsLock); // Estimate dropped bytes from average payload size. const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize(); - m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt)); + m_stats.rcvr.dropped.count(stats::BytesPackets(iDropStatCnt * avgpayloadsz, (uint32_t)iDropStatCnt)); leaveCS(m_StatsLock); } - return iDropCnt; + return iDropCntTotal; } void srt::CUDT::setInitialRcvSeq(int32_t isn) @@ -7835,7 +7841,7 @@ void srt::CUDT::dropToGroupRecvBase() return; ScopedLock lck(m_RcvBufferLock); - int cnt = rcvDropTooLateUpTo(CSeqNo::incseq(group_recv_base)); + const int cnt = rcvDropTooLateUpTo(CSeqNo::incseq(group_recv_base), DROP_DISCARD); if (cnt > 0) { HLOGC(grlog.Debug, @@ -8063,6 +8069,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) string reason; // just for "a reason" of giving particular % for ACK #if ENABLE_BONDING + // TODO: The group drops other members upon reading, maybe no longer needed here? dropToGroupRecvBase(); #endif diff --git a/srtcore/core.h b/srtcore/core.h index 10746c8c9..73b053197 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -755,11 +755,18 @@ class CUDT // TSBPD thread main function. static void* tsbpd(void* param); + enum DropReason + { + DROP_TOO_LATE, //< Drop to keep up to the live pace (TLPKTDROP). + DROP_DISCARD //< Drop because another group member already provided these packets. + }; + /// Drop too late packets (receiver side). Update loss lists and ACK positions. /// The @a seqno packet itself is not dropped. /// @param seqno [in] The sequence number of the first packets following those to be dropped. + /// @param reason A reason for dropping (see @a DropReason). /// @return The number of packets dropped. - int rcvDropTooLateUpTo(int seqno); + int rcvDropTooLateUpTo(int seqno, DropReason reason = DROP_TOO_LATE); static loss_seqs_t defaultPacketArrival(void* vself, CPacket& pkt); static loss_seqs_t groupPacketArrival(void* vself, CPacket& pkt); diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 52b31d29f..282e8f25a 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -2338,7 +2338,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) ScopedLock lg(ps->core().m_RcvBufferLock); if (m_RcvBaseSeqNo != SRT_SEQNO_NONE) { - const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo)); + const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo), CUDT::DROP_DISCARD); if (cnt > 0) { HLOGC(grlog.Debug,