Skip to content

Commit

Permalink
[core] Fix lacking mutex protection of some ACK-related fields (#2723).
Browse files Browse the repository at this point in the history
Repeated fix for #2706.
  • Loading branch information
ethouris committed Sep 19, 2023
1 parent 37e6588 commit 6c723e5
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 50 deletions.
110 changes: 61 additions & 49 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6433,7 +6433,7 @@ int srt::CUDT::sndDropTooLate()
m_iSndCurrSeqNo = minlastack;
}

HLOGC(aslog.Debug,
HLOGC(qslog.Debug,
log << CONID() << "SND-DROP: %(" << realack << "-" << m_iSndCurrSeqNo << ") n=" << dpkts << "pkt " << dbytes
<< "B, span=" << buffdelay_ms << " ms, FIRST #" << first_msgno);

Expand Down Expand Up @@ -7343,6 +7343,8 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)

const int pktHdrSize = CPacket::HDR_SIZE + CPacket::UDP_HDR_SIZE;
{
int32_t flight_span = getFlightSpan();

ScopedLock statsguard(m_StatsLock);

const steady_clock::time_point currtime = steady_clock::now();
Expand Down Expand Up @@ -7426,7 +7428,7 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)
perf->usPktSndPeriod = (double) count_microseconds(m_tdSendInterval.load());
perf->pktFlowWindow = m_iFlowWindowSize.load();
perf->pktCongestionWindow = (int)m_dCongestionWindow;
perf->pktFlightSize = getFlightSpan();
perf->pktFlightSize = flight_span;
perf->msRTT = (double)m_iSRTT / 1000.0;
perf->msSndTsbPdDelay = m_bPeerTsbPd ? m_iPeerTsbPdDelay_ms : 0;
perf->msRcvTsbPdDelay = isOPT_TsbPd() ? m_iTsbPdDelay_ms : 0;
Expand Down Expand Up @@ -8311,9 +8313,9 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_
// Process a lite ACK
if (isLiteAck)
{
ScopedLock ack_lock(m_RecvAckLock);
if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
{
ScopedLock ack_lock(m_RecvAckLock);
m_iFlowWindowSize = m_iFlowWindowSize - CSeqNo::seqoff(m_iSndLastAck, ackdata_seqno);
m_iSndLastAck = ackdata_seqno;

Expand Down Expand Up @@ -8881,8 +8883,10 @@ void srt::CUDT::processCtrlHS(const CPacket& ctrlpkt)

// If createSrtHandshake failed, don't send anything. Actually it can only fail on IPE.
// There is also no possible IPE condition in case of HSv4 - for this version it will always return true.
if (createSrtHandshake(SRT_CMD_HSRSP, SRT_CMD_KMRSP, kmdata, kmdatasize,
(response), (initdata)))
enterCS(m_ConnectionLock);
bool create_ok = createSrtHandshake(SRT_CMD_HSRSP, SRT_CMD_KMRSP, kmdata, kmdatasize, (response), (initdata));
leaveCS(m_ConnectionLock);
if (create_ok)
{
response.m_iID = m_PeerID;
setPacketTS(response, steady_clock::now());
Expand Down Expand Up @@ -9605,71 +9609,79 @@ bool srt::CUDT::packData(CPacket& w_packet, steady_clock::time_point& w_nexttime

bool srt::CUDT::packUniqueData(CPacket& w_packet)
{
// Check the congestion/flow window limit
const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const int flightspan = getFlightSpan();
if (cwnd <= flightspan)
{
HLOGC(qslog.Debug,
log << CONID() << "packUniqueData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow
<< ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan);
return false;
}

// XXX Here it's needed to set kflg to msgno_bitset in the block stored in the
// send buffer. This should be somehow avoided, the crypto flags should be set
// together with encrypting, and the packet should be sent as is, when rexmitting.
// It would be nice to research as to whether CSndBuffer::Block::m_iMsgNoBitset field
// isn't a useless redundant state copy. If it is, then taking the flags here can be removed.
const int kflg = m_pCryptoControl->getSndCryptoFlags();
int pktskipseqno = 0;
int current_sequence_number; // reflexing variable
int kflg;
time_point tsOrigin;
const int pld_size = m_pSndBuffer->readData((w_packet), (tsOrigin), kflg, (pktskipseqno));
if (pktskipseqno)
{
// Some packets were skipped due to TTL expiry.
m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo, pktskipseqno);
HLOGC(qslog.Debug, log << "packUniqueData: reading skipped " << pktskipseqno << " seq up to %" << m_iSndCurrSeqNo
<< " due to TTL expiry");
}
int pld_size;

if (pld_size == 0)
{
HLOGC(qslog.Debug, log << "packUniqueData: nothing extracted from the buffer");
return false;
}
ScopedLock lkrack (m_RecvAckLock);
// Check the congestion/flow window limit
const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const int flightspan = getFlightSpan();
if (cwnd <= flightspan)
{
HLOGC(qslog.Debug,
log << CONID() << "packUniqueData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow
<< ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan);
return false;
}

// A CHANGE. The sequence number is currently added to the packet
// when scheduling, not when extracting. This is a inter-migration form,
// only override extraction sequence with scheduling sequence in group mode.
m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);
// XXX Here it's needed to set kflg to msgno_bitset in the block stored in the
// send buffer. This should be somehow avoided, the crypto flags should be set
// together with encrypting, and the packet should be sent as is, when rexmitting.
// It would be nice to research as to whether CSndBuffer::Block::m_iMsgNoBitset field
// isn't a useless redundant state copy. If it is, then taking the flags here can be removed.
kflg = m_pCryptoControl->getSndCryptoFlags();
int pktskipseqno = 0;
pld_size = m_pSndBuffer->readData((w_packet), (tsOrigin), kflg, (pktskipseqno));
if (pktskipseqno)
{
// Some packets were skipped due to TTL expiry.
m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo, pktskipseqno);
HLOGC(qslog.Debug, log << "packUniqueData: reading skipped " << pktskipseqno << " seq up to %" << m_iSndCurrSeqNo
<< " due to TTL expiry");
}

if (pld_size == 0)
{
HLOGC(qslog.Debug, log << "packUniqueData: nothing extracted from the buffer");
return false;
}

// A CHANGE. The sequence number is currently added to the packet
// when scheduling, not when extracting. This is a inter-migration form,
// only override extraction sequence with scheduling sequence in group mode.
m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);
current_sequence_number = m_iSndCurrSeqNo;
}

#if ENABLE_BONDING
// Fortunately the group itself isn't being accessed.
if (m_parent->m_GroupOf)
{
const int packetspan = CSeqNo::seqoff(m_iSndCurrSeqNo, w_packet.m_iSeqNo);
const int packetspan = CSeqNo::seqoff(current_sequence_number, w_packet.m_iSeqNo);
if (packetspan > 0)
{
// After increasing by 1, but being previously set as ISN-1, this should be == ISN,
// if this is the very first packet to send.
if (m_iSndCurrSeqNo == m_iISN)
if (current_sequence_number == m_iISN)
{
// This is the very first packet to be sent; so there's nothing in
// the sending buffer yet, and therefore we are in a situation as just
// after connection. No packets in the buffer, no packets are sent,
// no ACK to be awaited. We can screw up all the variables that are
// initialized from ISN just after connection.
LOGC(qslog.Note,
log << CONID() << "packUniqueData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo
log << CONID() << "packUniqueData: Fixing EXTRACTION sequence " << current_sequence_number
<< " from SCHEDULING sequence " << w_packet.m_iSeqNo << " for the first packet: DIFF="
<< packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));
}
else
{
// There will be a serious data discrepancy between the agent and the peer.
LOGC(qslog.Error,
log << CONID() << "IPE: packUniqueData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo
log << CONID() << "IPE: packUniqueData: Fixing EXTRACTION sequence " << current_sequence_number
<< " from SCHEDULING sequence " << w_packet.m_iSeqNo << " in the middle of transition: DIFF="
<< packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));
}
Expand All @@ -9678,7 +9690,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
// won't stupidly request the packets to be retransmitted.
// Don't do it if the difference isn't positive or exceeds the threshold.
int32_t seqpair[2];
seqpair[0] = m_iSndCurrSeqNo;
seqpair[0] = current_sequence_number;
seqpair[1] = CSeqNo::decseq(w_packet.m_iSeqNo);
const int32_t no_msgno = 0;
LOGC(qslog.Debug,
Expand All @@ -9690,8 +9702,8 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
// packet are not present in the buffer (preadte the send buffer).

// Override extraction sequence with scheduling sequence.
m_iSndCurrSeqNo = w_packet.m_iSeqNo;
ScopedLock ackguard(m_RecvAckLock);
m_iSndCurrSeqNo = w_packet.m_iSeqNo;
m_iSndLastAck = w_packet.m_iSeqNo;
m_iSndLastDataAck = w_packet.m_iSeqNo;
m_iSndLastFullAck = w_packet.m_iSeqNo;
Expand All @@ -9701,7 +9713,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
{
LOGC(qslog.Error,
log << CONID() << "IPE: packData: SCHEDULING sequence " << w_packet.m_iSeqNo
<< " is behind of EXTRACTION sequence " << m_iSndCurrSeqNo << ", dropping this packet: DIFF="
<< " is behind of EXTRACTION sequence " << current_sequence_number << ", dropping this packet: DIFF="
<< packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));
// XXX: Probably also change the socket state to broken?
return false;
Expand All @@ -9711,12 +9723,12 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
#endif
{
HLOGC(qslog.Debug,
log << CONID() << "packUniqueData: Applying EXTRACTION sequence " << m_iSndCurrSeqNo
log << CONID() << "packUniqueData: Applying EXTRACTION sequence " << current_sequence_number
<< " over SCHEDULING sequence " << w_packet.m_iSeqNo << " for socket not in group:"
<< " DIFF=" << CSeqNo::seqcmp(m_iSndCurrSeqNo, w_packet.m_iSeqNo)
<< " DIFF=" << CSeqNo::seqcmp(current_sequence_number, w_packet.m_iSeqNo)
<< " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));
// Do this always when not in a group.
w_packet.m_iSeqNo = m_iSndCurrSeqNo;
w_packet.m_iSeqNo = current_sequence_number;
}

// Set missing fields before encrypting the packet, because those fields might be used for encryption.
Expand Down
3 changes: 2 additions & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,7 @@ class CUDT
// require only the lost sequence number, and how to find the packet with this sequence
// will be up to the sending buffer.
sync::atomic<int32_t> m_iSndLastDataAck; // The real last ACK that updates the sender buffer and loss list
SRT_ATTR_GUARDED_BY(m_RecvAckLock)
sync::atomic<int32_t> m_iSndCurrSeqNo; // The largest sequence number that HAS BEEN SENT
sync::atomic<int32_t> m_iSndNextSeqNo; // The sequence number predicted to be placed at the currently scheduled packet

Expand Down Expand Up @@ -999,7 +1000,7 @@ class CUDT

mutable sync::Mutex m_RcvBufferLock; // Protects the state of the m_pRcvBuffer
// Protects access to m_iSndCurrSeqNo, m_iSndLastAck
sync::Mutex m_RecvAckLock; // Protects the state changes while processing incoming ACK (SRT_EPOLL_OUT)
mutable sync::Mutex m_RecvAckLock; // Protects the state changes while processing incoming ACK (SRT_EPOLL_OUT)

sync::Condition m_RecvDataCond; // used to block "srt_recv*" when there is no data. Use together with m_RecvLock
sync::Mutex m_RecvLock; // used to synchronize "srt_recv*" call, protects TSBPD drift updates (CRcvBuffer::isRcvDataReady())
Expand Down
4 changes: 4 additions & 0 deletions srtcore/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ class CUniqueSync: public CSync

UniqueLock& locker() { return m_ulock; }

SRT_ATTR_ACQUIRE(this->m_ulock.mutex())
CUniqueSync(Mutex& mut, Condition& cnd)
: CSync(cnd, m_ulock)
, m_ulock(mut)
Expand All @@ -681,6 +682,9 @@ class CUniqueSync: public CSync
{
}

SRT_ATTR_RELEASE(this->m_ulock.mutex())
~CUniqueSync() {}

// These functions can be used safely because
// this whole class guarantees that whatever happens
// while its object exists is that the mutex is locked.
Expand Down

0 comments on commit 6c723e5

Please sign in to comment.