Skip to content

Commit

Permalink
Merge branch 'master' into develop/remove-drop-to-grp-rcv-base
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko authored Apr 22, 2024
2 parents 49b8dea + 4f925fb commit 591603d
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 14 deletions.
18 changes: 13 additions & 5 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ int CRcvBuffer::insert(CUnit* unit)
return 0;
}

int CRcvBuffer::dropUpTo(int32_t seqno)
std::pair<int, int> CRcvBuffer::dropUpTo(int32_t seqno)
{
IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo);
Expand All @@ -215,16 +215,23 @@ int CRcvBuffer::dropUpTo(int32_t seqno)
if (len <= 0)
{
IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop.");
return 0;
return std::make_pair(0, 0);
}

m_iMaxPosOff -= len;
if (m_iMaxPosOff < 0)
m_iMaxPosOff = 0;

const int iDropCnt = len;
int iNumDropped = 0; // Number of dropped packets that were missing.
int iNumDiscarded = 0; // The number of dropped packets that existed in the buffer.
while (len > 0)
{
// Note! Dropping a EntryState_Read must not be counted as a drop because it was read.
// Note! Dropping a EntryState_Drop must not be counted as a drop because it was already dropped and counted earlier.
if (m_entries[m_iStartPos].status == EntryState_Avail)
++iNumDiscarded;
else if (m_entries[m_iStartPos].status == EntryState_Empty)
++iNumDropped;
dropUnitInPos(m_iStartPos);
m_entries[m_iStartPos].status = EntryState_Empty;
SRT_ASSERT(m_entries[m_iStartPos].pUnit == NULL && m_entries[m_iStartPos].status == EntryState_Empty);
Expand All @@ -246,7 +253,7 @@ int CRcvBuffer::dropUpTo(int32_t seqno)
}
if (!m_tsbpd.isEnabled() && m_bMessageAPI)
updateFirstReadableOutOfOrder();
return iDropCnt;
return std::make_pair(iNumDropped, iNumDiscarded);
}

int CRcvBuffer::dropAll()
Expand All @@ -255,7 +262,8 @@ int CRcvBuffer::dropAll()
return 0;

const int end_seqno = CSeqNo::incseq(m_iStartSeqNo, m_iMaxPosOff);
return dropUpTo(end_seqno);
const std::pair<int, int> numDropped = dropUpTo(end_seqno);
return numDropped.first + numDropped.second;
}

int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, DropActionIfExists actionOnExisting)
Expand Down
4 changes: 2 additions & 2 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class CRcvBuffer

/// Drop packets in the receiver buffer from the current position up to the seqno (excluding seqno).
/// @param [in] seqno drop units up to this sequence number
/// @return number of dropped packets.
int dropUpTo(int32_t seqno);
/// @return number of dropped (missing) and discarded (available) packets as a pair(dropped, discarded).
std::pair<int, int> dropUpTo(int32_t seqno);

/// @brief Drop all the packets in the receiver buffer.
/// The starting position and seqno are shifted right after the last packet in the buffer.
Expand Down
16 changes: 11 additions & 5 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5576,24 +5576,30 @@ void * srt::CUDT::tsbpd(void* param)
return NULL;
}

int srt::CUDT::rcvDropTooLateUpTo(int seqno)
int srt::CUDT::rcvDropTooLateUpTo(int seqno, DropReason reason)
{
// Make sure that it would not drop over m_iRcvCurrSeqNo, which may break senders.
if (CSeqNo::seqcmp(seqno, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0)
seqno = CSeqNo::incseq(m_iRcvCurrSeqNo);

dropFromLossLists(SRT_SEQNO_NONE, CSeqNo::decseq(seqno));

const int iDropCnt = m_pRcvBuffer->dropUpTo(seqno);
if (iDropCnt > 0)
const std::pair<int, int> iDropDiscardedPkts = m_pRcvBuffer->dropUpTo(seqno);
const int iDropCnt = iDropDiscardedPkts.first;
const int iDiscardedCnt = iDropDiscardedPkts.second;
const int iDropCntTotal = iDropCnt + iDiscardedCnt;

// In case of DROP_TOO_LATE discarded packets should also be counted because they are not read from another member socket.
const int iDropStatCnt = (reason == DROP_DISCARD) ? iDropCnt : iDropCntTotal;
if (iDropStatCnt > 0)
{
enterCS(m_StatsLock);
// Estimate dropped bytes from average payload size.
const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize();
m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt));
m_stats.rcvr.dropped.count(stats::BytesPackets(iDropStatCnt * avgpayloadsz, (uint32_t)iDropStatCnt));
leaveCS(m_StatsLock);
}
return iDropCnt;
return iDropCntTotal;
}

void srt::CUDT::setInitialRcvSeq(int32_t isn)
Expand Down
9 changes: 8 additions & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -755,11 +755,18 @@ class CUDT
// TSBPD thread main function.
static void* tsbpd(void* param);

enum DropReason
{
DROP_TOO_LATE, //< Drop to keep up to the live pace (TLPKTDROP).
DROP_DISCARD //< Drop because another group member already provided these packets.
};

/// Drop too late packets (receiver side). Update loss lists and ACK positions.
/// The @a seqno packet itself is not dropped.
/// @param seqno [in] The sequence number of the first packets following those to be dropped.
/// @param reason A reason for dropping (see @a DropReason).
/// @return The number of packets dropped.
int rcvDropTooLateUpTo(int seqno);
int rcvDropTooLateUpTo(int seqno, DropReason reason = DROP_TOO_LATE);

static loss_seqs_t defaultPacketArrival(void* vself, CPacket& pkt);
static loss_seqs_t groupPacketArrival(void* vself, CPacket& pkt);
Expand Down
16 changes: 15 additions & 1 deletion srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
, m_uOPT_MinStabilityTimeout_us(1000 * CSrtConfig::COMM_DEF_MIN_STABILITY_TIMEOUT_MS)
// -1 = "undefined"; will become defined with first added socket
, m_iMaxPayloadSize(-1)
, m_iAvgPayloadSize(-1)
, m_bSynRecving(true)
, m_bSynSending(true)
, m_bTsbPd(true)
Expand Down Expand Up @@ -2309,6 +2310,19 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
}
fillGroupData((w_mc), w_mc);

// TODO: What if a drop happens before the very first packet was read? Maybe set to ISN?
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
const int32_t iNumDropped = (CSeqNo(w_mc.pktseq) - CSeqNo(m_RcvBaseSeqNo)) - 1;
if (iNumDropped > 0)
{
m_stats.recvDrop.count(stats::BytesPackets(iNumDropped * static_cast<uint64_t>(avgRcvPacketSize()), iNumDropped));
LOGC(grlog.Warn,
log << "@" << m_GroupID << " GROUP RCV-DROPPED " << iNumDropped << " packet(s): seqno %"
<< m_RcvBaseSeqNo << " to %" << w_mc.pktseq);
}
}

HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": Update m_RcvBaseSeqNo: %" << m_RcvBaseSeqNo << " -> %" << w_mc.pktseq);
m_RcvBaseSeqNo = w_mc.pktseq;
Expand All @@ -2324,7 +2338,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
ScopedLock lg(ps->core().m_RcvBufferLock);
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo), CUDT::DROP_DISCARD);
if (cnt > 0)
{
HLOGC(grlog.Debug,
Expand Down

0 comments on commit 591603d

Please sign in to comment.