Skip to content

Commit

Permalink
[core] Fix hang up on not enough space in the RCV buffer (#2745).
Browse files Browse the repository at this point in the history
When there is space available in the receiving buffer after it is full,
send an ack to allow the sender to resume transmission.
Reschedule sending if ACK decreases the flight span after sending is congested.

Co-authored-by: Maxim Sharabayko <[email protected]>
  • Loading branch information
yomnes0 and maxsharabayko authored Aug 8, 2023
1 parent c6572bf commit 744035b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 23 deletions.
51 changes: 29 additions & 22 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7908,7 +7908,6 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
SRT_ASSERT(ctrlpkt.getMsgTimeStamp() != 0);
int nbsent = 0;
int local_prevack = 0;

#if ENABLE_HEAVY_LOGGING
struct SaveBack
{
Expand All @@ -7931,21 +7930,22 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
// The TSBPD thread may change the first lost sequence record (TLPKTDROP).
// To avoid it the m_RcvBufferLock has to be acquired.
UniqueLock bufflock(m_RcvBufferLock);

// The full ACK should be sent to indicate there is now available space in the RCV buffer
// since the last full ACK. It should unblock the sender to proceed further.
const bool bNeedFullAck = (m_bBufferWasFull && getAvailRcvBufferSizeNoLock() > 0);
int32_t ack; // First unacknowledged packet sequence number (acknowledge up to ack).
if (!getFirstNoncontSequence((ack), (reason)))
return nbsent;

if (m_iRcvLastAckAck == ack)
if (m_iRcvLastAckAck == ack && !bNeedFullAck)
{
HLOGC(xtlog.Debug,
log << CONID() << "sendCtrl(UMSG_ACK): last ACK %" << ack << "(" << reason << ") == last ACKACK");
return nbsent;
HLOGC(xtlog.Debug,
log << CONID() << "sendCtrl(UMSG_ACK): last ACK %" << ack << "(" << reason << ") == last ACKACK");
return nbsent;
}

// send out a lite ACK
// to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number
if (size == SEND_LITE_ACK)
if (size == SEND_LITE_ACK && !bNeedFullAck)
{
bufflock.unlock();
ctrlpkt.pack(UMSG_ACK, NULL, &ack, size);
Expand Down Expand Up @@ -8083,7 +8083,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
CGlobEvent::triggerEvent();
}
}
else if (ack == m_iRcvLastAck)
else if (ack == m_iRcvLastAck && !bNeedFullAck)
{
// If the ACK was just sent already AND elapsed time did not exceed RTT,
if ((steady_clock::now() - m_tsLastAckTime) <
Expand All @@ -8094,7 +8094,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
return nbsent;
}
}
else
else if (!bNeedFullAck)
{
// Not possible (m_iRcvCurrSeqNo+1 <% m_iRcvLastAck ?)
LOGC(xtlog.Error, log << CONID() << "sendCtrl(UMSG_ACK): IPE: curr %" << ack << " <% last %" << m_iRcvLastAck);
Expand All @@ -8105,7 +8105,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
// [[using locked(m_RcvBufferLock)]];

// Send out the ACK only if has not been received by the sender before
if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0)
if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0 || bNeedFullAck)
{
// NOTE: The BSTATS feature turns on extra fields above size 6
// also known as ACKD_TOTAL_SIZE_VER100.
Expand All @@ -8121,10 +8121,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
data[ACKD_RTT] = m_iSRTT;
data[ACKD_RTTVAR] = m_iRTTVar;
data[ACKD_BUFFERLEFT] = (int) getAvailRcvBufferSizeNoLock();
// a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock
if (data[ACKD_BUFFERLEFT] < 2)
data[ACKD_BUFFERLEFT] = 2;

m_bBufferWasFull = data[ACKD_BUFFERLEFT] == 0;
if (steady_clock::now() - m_tsLastAckTime > m_tdACKInterval)
{
int rcvRate;
Expand Down Expand Up @@ -8299,7 +8296,6 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_
m_tsLastRspAckTime = currtime;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK
}

return;
}

Expand Down Expand Up @@ -8340,14 +8336,25 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_
return;
}

if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
{
const int cwnd1 = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const bool bWasStuck = cwnd1<= getFlightSpan();
// Update Flow Window Size, must update before and together with m_iSndLastAck
m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT];
m_iSndLastAck = ackdata_seqno;
m_tsLastRspAckTime = currtime;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK

const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
if (bWasStuck && cwnd > getFlightSpan())
{
// Update Flow Window Size, must update before and together with m_iSndLastAck
m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT];
m_iSndLastAck = ackdata_seqno;
m_tsLastRspAckTime = currtime;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
HLOGC(gglog.Debug,
log << CONID() << "processCtrlAck: could reschedule SND. iFlowWindowSize " << m_iFlowWindowSize
<< " SPAN " << getFlightSpan() << " ackdataseqno %" << ackdata_seqno);
}
}

/*
* We must not ignore full ack received by peer
Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ class CUDT
int32_t m_iAckSeqNo; // Last ACK sequence number
sync::atomic<int32_t> m_iRcvCurrSeqNo; // (RCV) Largest received sequence number. RcvQTh, TSBPDTh.
int32_t m_iRcvCurrPhySeqNo; // Same as m_iRcvCurrSeqNo, but physical only (disregarding a filter)

bool m_bBufferWasFull; // Indicate that RX buffer was full last time a ack was sent
int32_t m_iPeerISN; // Initial Sequence Number of the peer side

uint32_t m_uPeerSrtVersion;
Expand Down

0 comments on commit 744035b

Please sign in to comment.