From 744035b2a41d17793d1596c79dc60d8add9ca5d4 Mon Sep 17 00:00:00 2001 From: yomnes0 <127947185+yomnes0@users.noreply.github.com> Date: Tue, 8 Aug 2023 17:08:10 +0200 Subject: [PATCH] [core] Fix hang up on not enough space in the RCV buffer (#2745). When there is space available in the receiving buffer after it is full, send an ack to allow the sender to resume transmission. Reschedule sending if ACK decreases the flight span after sending is congested. Co-authored-by: Maxim Sharabayko --- srtcore/core.cpp | 51 +++++++++++++++++++++++++++--------------------- srtcore/core.h | 2 +- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index d58aa2478..bcc105001 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -7908,7 +7908,6 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) SRT_ASSERT(ctrlpkt.getMsgTimeStamp() != 0); int nbsent = 0; int local_prevack = 0; - #if ENABLE_HEAVY_LOGGING struct SaveBack { @@ -7931,21 +7930,22 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) // The TSBPD thread may change the first lost sequence record (TLPKTDROP). // To avoid it the m_RcvBufferLock has to be acquired. UniqueLock bufflock(m_RcvBufferLock); - + // The full ACK should be sent to indicate there is now available space in the RCV buffer + // since the last full ACK. It should unblock the sender to proceed further. + const bool bNeedFullAck = (m_bBufferWasFull && getAvailRcvBufferSizeNoLock() > 0); int32_t ack; // First unacknowledged packet sequence number (acknowledge up to ack). if (!getFirstNoncontSequence((ack), (reason))) return nbsent; - if (m_iRcvLastAckAck == ack) + if (m_iRcvLastAckAck == ack && !bNeedFullAck) { - HLOGC(xtlog.Debug, - log << CONID() << "sendCtrl(UMSG_ACK): last ACK %" << ack << "(" << reason << ") == last ACKACK"); - return nbsent; + HLOGC(xtlog.Debug, + log << CONID() << "sendCtrl(UMSG_ACK): last ACK %" << ack << "(" << reason << ") == last ACKACK"); + return nbsent; } - // send out a lite ACK // to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number - if (size == SEND_LITE_ACK) + if (size == SEND_LITE_ACK && !bNeedFullAck) { bufflock.unlock(); ctrlpkt.pack(UMSG_ACK, NULL, &ack, size); @@ -8083,7 +8083,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) CGlobEvent::triggerEvent(); } } - else if (ack == m_iRcvLastAck) + else if (ack == m_iRcvLastAck && !bNeedFullAck) { // If the ACK was just sent already AND elapsed time did not exceed RTT, if ((steady_clock::now() - m_tsLastAckTime) < @@ -8094,7 +8094,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) return nbsent; } } - else + else if (!bNeedFullAck) { // Not possible (m_iRcvCurrSeqNo+1 <% m_iRcvLastAck ?) LOGC(xtlog.Error, log << CONID() << "sendCtrl(UMSG_ACK): IPE: curr %" << ack << " <% last %" << m_iRcvLastAck); @@ -8105,7 +8105,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) // [[using locked(m_RcvBufferLock)]]; // Send out the ACK only if has not been received by the sender before - if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0) + if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0 || bNeedFullAck) { // NOTE: The BSTATS feature turns on extra fields above size 6 // also known as ACKD_TOTAL_SIZE_VER100. @@ -8121,10 +8121,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) data[ACKD_RTT] = m_iSRTT; data[ACKD_RTTVAR] = m_iRTTVar; data[ACKD_BUFFERLEFT] = (int) getAvailRcvBufferSizeNoLock(); - // a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock - if (data[ACKD_BUFFERLEFT] < 2) - data[ACKD_BUFFERLEFT] = 2; - + m_bBufferWasFull = data[ACKD_BUFFERLEFT] == 0; if (steady_clock::now() - m_tsLastAckTime > m_tdACKInterval) { int rcvRate; @@ -8299,7 +8296,6 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_ m_tsLastRspAckTime = currtime; m_iReXmitCount = 1; // Reset re-transmit count since last ACK } - return; } @@ -8340,14 +8336,25 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_ return; } - if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0) + if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0) + { + const int cwnd1 = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow)); + const bool bWasStuck = cwnd1<= getFlightSpan(); + // Update Flow Window Size, must update before and together with m_iSndLastAck + m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT]; + m_iSndLastAck = ackdata_seqno; + m_tsLastRspAckTime = currtime; + m_iReXmitCount = 1; // Reset re-transmit count since last ACK + + const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow)); + if (bWasStuck && cwnd > getFlightSpan()) { - // Update Flow Window Size, must update before and together with m_iSndLastAck - m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT]; - m_iSndLastAck = ackdata_seqno; - m_tsLastRspAckTime = currtime; - m_iReXmitCount = 1; // Reset re-transmit count since last ACK + m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); + HLOGC(gglog.Debug, + log << CONID() << "processCtrlAck: could reschedule SND. iFlowWindowSize " << m_iFlowWindowSize + << " SPAN " << getFlightSpan() << " ackdataseqno %" << ackdata_seqno); } + } /* * We must not ignore full ack received by peer diff --git a/srtcore/core.h b/srtcore/core.h index b83dec6e5..c1d8e64e8 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -937,7 +937,7 @@ class CUDT int32_t m_iAckSeqNo; // Last ACK sequence number sync::atomic m_iRcvCurrSeqNo; // (RCV) Largest received sequence number. RcvQTh, TSBPDTh. int32_t m_iRcvCurrPhySeqNo; // Same as m_iRcvCurrSeqNo, but physical only (disregarding a filter) - + bool m_bBufferWasFull; // Indicate that RX buffer was full last time a ack was sent int32_t m_iPeerISN; // Initial Sequence Number of the peer side uint32_t m_uPeerSrtVersion;