From aeeb43768a435d1782202b6de18addec6899bb4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 20 Apr 2023 10:29:54 +0200 Subject: [PATCH] Repeated fix for 2706 - fixes lacking mutex protection to some ACK-related fields --- srtcore/core.cpp | 110 ++++++++++++++++++++++++++--------------------- srtcore/core.h | 3 +- srtcore/sync.h | 4 ++ 3 files changed, 67 insertions(+), 50 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 7205f8574..9a6f91b79 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -6409,7 +6409,7 @@ int srt::CUDT::sndDropTooLate() m_iSndCurrSeqNo = minlastack; } - HLOGC(aslog.Debug, + HLOGC(qslog.Debug, log << CONID() << "SND-DROP: %(" << realack << "-" << m_iSndCurrSeqNo << ") n=" << dpkts << "pkt " << dbytes << "B, span=" << buffdelay_ms << " ms, FIRST #" << first_msgno); @@ -7315,6 +7315,8 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous) const int pktHdrSize = CPacket::HDR_SIZE + CPacket::UDP_HDR_SIZE; { + int32_t flight_span = getFlightSpan(); + ScopedLock statsguard(m_StatsLock); const steady_clock::time_point currtime = steady_clock::now(); @@ -7398,7 +7400,7 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous) perf->usPktSndPeriod = (double) count_microseconds(m_tdSendInterval.load()); perf->pktFlowWindow = m_iFlowWindowSize.load(); perf->pktCongestionWindow = (int)m_dCongestionWindow; - perf->pktFlightSize = getFlightSpan(); + perf->pktFlightSize = flight_span; perf->msRTT = (double)m_iSRTT / 1000.0; perf->msSndTsbPdDelay = m_bPeerTsbPd ? m_iPeerTsbPdDelay_ms : 0; perf->msRcvTsbPdDelay = isOPT_TsbPd() ? m_iTsbPdDelay_ms : 0; @@ -8286,9 +8288,9 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_ // Process a lite ACK if (isLiteAck) { + ScopedLock ack_lock(m_RecvAckLock); if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0) { - ScopedLock ack_lock(m_RecvAckLock); m_iFlowWindowSize = m_iFlowWindowSize - CSeqNo::seqoff(m_iSndLastAck, ackdata_seqno); m_iSndLastAck = ackdata_seqno; @@ -8846,8 +8848,10 @@ void srt::CUDT::processCtrlHS(const CPacket& ctrlpkt) // If createSrtHandshake failed, don't send anything. Actually it can only fail on IPE. // There is also no possible IPE condition in case of HSv4 - for this version it will always return true. - if (createSrtHandshake(SRT_CMD_HSRSP, SRT_CMD_KMRSP, kmdata, kmdatasize, - (response), (initdata))) + enterCS(m_ConnectionLock); + bool create_ok = createSrtHandshake(SRT_CMD_HSRSP, SRT_CMD_KMRSP, kmdata, kmdatasize, (response), (initdata)); + leaveCS(m_ConnectionLock); + if (create_ok) { response.m_iID = m_PeerID; setPacketTS(response, steady_clock::now()); @@ -9554,55 +9558,63 @@ bool srt::CUDT::packData(CPacket& w_packet, steady_clock::time_point& w_nexttime bool srt::CUDT::packUniqueData(CPacket& w_packet) { - // Check the congestion/flow window limit - const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow)); - const int flightspan = getFlightSpan(); - if (cwnd <= flightspan) - { - HLOGC(qslog.Debug, - log << CONID() << "packUniqueData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow - << ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan); - return false; - } - - // XXX Here it's needed to set kflg to msgno_bitset in the block stored in the - // send buffer. This should be somehow avoided, the crypto flags should be set - // together with encrypting, and the packet should be sent as is, when rexmitting. - // It would be nice to research as to whether CSndBuffer::Block::m_iMsgNoBitset field - // isn't a useless redundant state copy. If it is, then taking the flags here can be removed. - const int kflg = m_pCryptoControl->getSndCryptoFlags(); - int pktskipseqno = 0; + int current_sequence_number; // reflexing variable + int kflg; time_point tsOrigin; - const int pld_size = m_pSndBuffer->readData((w_packet), (tsOrigin), kflg, (pktskipseqno)); - if (pktskipseqno) - { - // Some packets were skipped due to TTL expiry. - m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo, pktskipseqno); - HLOGC(qslog.Debug, log << "packUniqueData: reading skipped " << pktskipseqno << " seq up to %" << m_iSndCurrSeqNo - << " due to TTL expiry"); - } + int pld_size; - if (pld_size == 0) { - HLOGC(qslog.Debug, log << "packUniqueData: nothing extracted from the buffer"); - return false; - } + ScopedLock lkrack (m_RecvAckLock); + // Check the congestion/flow window limit + const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow)); + const int flightspan = getFlightSpan(); + if (cwnd <= flightspan) + { + HLOGC(qslog.Debug, + log << CONID() << "packUniqueData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow + << ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan); + return false; + } - // A CHANGE. The sequence number is currently added to the packet - // when scheduling, not when extracting. This is a inter-migration form, - // only override extraction sequence with scheduling sequence in group mode. - m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo); + // XXX Here it's needed to set kflg to msgno_bitset in the block stored in the + // send buffer. This should be somehow avoided, the crypto flags should be set + // together with encrypting, and the packet should be sent as is, when rexmitting. + // It would be nice to research as to whether CSndBuffer::Block::m_iMsgNoBitset field + // isn't a useless redundant state copy. If it is, then taking the flags here can be removed. + kflg = m_pCryptoControl->getSndCryptoFlags(); + int pktskipseqno = 0; + pld_size = m_pSndBuffer->readData((w_packet), (tsOrigin), kflg, (pktskipseqno)); + if (pktskipseqno) + { + // Some packets were skipped due to TTL expiry. + m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo, pktskipseqno); + HLOGC(qslog.Debug, log << "packUniqueData: reading skipped " << pktskipseqno << " seq up to %" << m_iSndCurrSeqNo + << " due to TTL expiry"); + } + + if (pld_size == 0) + { + HLOGC(qslog.Debug, log << "packUniqueData: nothing extracted from the buffer"); + return false; + } + + // A CHANGE. The sequence number is currently added to the packet + // when scheduling, not when extracting. This is a inter-migration form, + // only override extraction sequence with scheduling sequence in group mode. + m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo); + current_sequence_number = m_iSndCurrSeqNo; + } #if ENABLE_BONDING // Fortunately the group itself isn't being accessed. if (m_parent->m_GroupOf) { - const int packetspan = CSeqNo::seqoff(m_iSndCurrSeqNo, w_packet.m_iSeqNo); + const int packetspan = CSeqNo::seqoff(current_sequence_number, w_packet.m_iSeqNo); if (packetspan > 0) { // After increasing by 1, but being previously set as ISN-1, this should be == ISN, // if this is the very first packet to send. - if (m_iSndCurrSeqNo == m_iISN) + if (current_sequence_number == m_iISN) { // This is the very first packet to be sent; so there's nothing in // the sending buffer yet, and therefore we are in a situation as just @@ -9610,7 +9622,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) // no ACK to be awaited. We can screw up all the variables that are // initialized from ISN just after connection. LOGC(qslog.Note, - log << CONID() << "packUniqueData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo + log << CONID() << "packUniqueData: Fixing EXTRACTION sequence " << current_sequence_number << " from SCHEDULING sequence " << w_packet.m_iSeqNo << " for the first packet: DIFF=" << packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); } @@ -9618,7 +9630,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) { // There will be a serious data discrepancy between the agent and the peer. LOGC(qslog.Error, - log << CONID() << "IPE: packUniqueData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo + log << CONID() << "IPE: packUniqueData: Fixing EXTRACTION sequence " << current_sequence_number << " from SCHEDULING sequence " << w_packet.m_iSeqNo << " in the middle of transition: DIFF=" << packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); } @@ -9627,7 +9639,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) // won't stupidly request the packets to be retransmitted. // Don't do it if the difference isn't positive or exceeds the threshold. int32_t seqpair[2]; - seqpair[0] = m_iSndCurrSeqNo; + seqpair[0] = current_sequence_number; seqpair[1] = CSeqNo::decseq(w_packet.m_iSeqNo); const int32_t no_msgno = 0; LOGC(qslog.Debug, @@ -9639,8 +9651,8 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) // packet are not present in the buffer (preadte the send buffer). // Override extraction sequence with scheduling sequence. - m_iSndCurrSeqNo = w_packet.m_iSeqNo; ScopedLock ackguard(m_RecvAckLock); + m_iSndCurrSeqNo = w_packet.m_iSeqNo; m_iSndLastAck = w_packet.m_iSeqNo; m_iSndLastDataAck = w_packet.m_iSeqNo; m_iSndLastFullAck = w_packet.m_iSeqNo; @@ -9650,7 +9662,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) { LOGC(qslog.Error, log << CONID() << "IPE: packData: SCHEDULING sequence " << w_packet.m_iSeqNo - << " is behind of EXTRACTION sequence " << m_iSndCurrSeqNo << ", dropping this packet: DIFF=" + << " is behind of EXTRACTION sequence " << current_sequence_number << ", dropping this packet: DIFF=" << packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); // XXX: Probably also change the socket state to broken? return false; @@ -9660,12 +9672,12 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) #endif { HLOGC(qslog.Debug, - log << CONID() << "packUniqueData: Applying EXTRACTION sequence " << m_iSndCurrSeqNo + log << CONID() << "packUniqueData: Applying EXTRACTION sequence " << current_sequence_number << " over SCHEDULING sequence " << w_packet.m_iSeqNo << " for socket not in group:" - << " DIFF=" << CSeqNo::seqcmp(m_iSndCurrSeqNo, w_packet.m_iSeqNo) + << " DIFF=" << CSeqNo::seqcmp(current_sequence_number, w_packet.m_iSeqNo) << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); // Do this always when not in a group. - w_packet.m_iSeqNo = m_iSndCurrSeqNo; + w_packet.m_iSeqNo = current_sequence_number; } // Set missing fields before encrypting the packet, because those fields might be used for encryption. diff --git a/srtcore/core.h b/srtcore/core.h index e7ca57c50..7a6dc088e 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -865,6 +865,7 @@ class CUDT // require only the lost sequence number, and how to find the packet with this sequence // will be up to the sending buffer. sync::atomic m_iSndLastDataAck; // The real last ACK that updates the sender buffer and loss list + SRT_ATTR_GUARDED_BY(m_RecvAckLock) sync::atomic m_iSndCurrSeqNo; // The largest sequence number that HAS BEEN SENT sync::atomic m_iSndNextSeqNo; // The sequence number predicted to be placed at the currently scheduled packet @@ -975,7 +976,7 @@ class CUDT mutable sync::Mutex m_RcvBufferLock; // Protects the state of the m_pRcvBuffer // Protects access to m_iSndCurrSeqNo, m_iSndLastAck - sync::Mutex m_RecvAckLock; // Protects the state changes while processing incoming ACK (SRT_EPOLL_OUT) + mutable sync::Mutex m_RecvAckLock; // Protects the state changes while processing incoming ACK (SRT_EPOLL_OUT) sync::Condition m_RecvDataCond; // used to block "srt_recv*" when there is no data. Use together with m_RecvLock sync::Mutex m_RecvLock; // used to synchronize "srt_recv*" call, protects TSBPD drift updates (CRcvBuffer::isRcvDataReady()) diff --git a/srtcore/sync.h b/srtcore/sync.h index 87be6f458..74b38f028 100644 --- a/srtcore/sync.h +++ b/srtcore/sync.h @@ -669,6 +669,7 @@ class CUniqueSync: public CSync UniqueLock& locker() { return m_ulock; } + SRT_ATTR_ACQUIRE(this->m_ulock.mutex()) CUniqueSync(Mutex& mut, Condition& cnd) : CSync(cnd, m_ulock) , m_ulock(mut) @@ -681,6 +682,9 @@ class CUniqueSync: public CSync { } + SRT_ATTR_RELEASE(this->m_ulock.mutex()) + ~CUniqueSync() {} + // These functions can be used safely because // this whole class guarantees that whatever happens // while its object exists is that the mutex is locked.