Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fixes lacking mutex protection to some ACK-related fields #2723

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 61 additions & 49 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6409,7 +6409,7 @@ int srt::CUDT::sndDropTooLate()
m_iSndCurrSeqNo = minlastack;
}

HLOGC(aslog.Debug,
HLOGC(qslog.Debug,
log << CONID() << "SND-DROP: %(" << realack << "-" << m_iSndCurrSeqNo << ") n=" << dpkts << "pkt " << dbytes
<< "B, span=" << buffdelay_ms << " ms, FIRST #" << first_msgno);

Expand Down Expand Up @@ -7315,6 +7315,8 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)

const int pktHdrSize = CPacket::HDR_SIZE + CPacket::UDP_HDR_SIZE;
{
int32_t flight_span = getFlightSpan();

ScopedLock statsguard(m_StatsLock);

const steady_clock::time_point currtime = steady_clock::now();
Expand Down Expand Up @@ -7398,7 +7400,7 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)
perf->usPktSndPeriod = (double) count_microseconds(m_tdSendInterval.load());
perf->pktFlowWindow = m_iFlowWindowSize.load();
perf->pktCongestionWindow = (int)m_dCongestionWindow;
perf->pktFlightSize = getFlightSpan();
perf->pktFlightSize = flight_span;
perf->msRTT = (double)m_iSRTT / 1000.0;
perf->msSndTsbPdDelay = m_bPeerTsbPd ? m_iPeerTsbPdDelay_ms : 0;
perf->msRcvTsbPdDelay = isOPT_TsbPd() ? m_iTsbPdDelay_ms : 0;
Expand Down Expand Up @@ -8286,9 +8288,9 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_
// Process a lite ACK
if (isLiteAck)
{
ScopedLock ack_lock(m_RecvAckLock);
if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
{
ScopedLock ack_lock(m_RecvAckLock);
m_iFlowWindowSize = m_iFlowWindowSize - CSeqNo::seqoff(m_iSndLastAck, ackdata_seqno);
m_iSndLastAck = ackdata_seqno;

Expand Down Expand Up @@ -8846,8 +8848,10 @@ void srt::CUDT::processCtrlHS(const CPacket& ctrlpkt)

// If createSrtHandshake failed, don't send anything. Actually it can only fail on IPE.
// There is also no possible IPE condition in case of HSv4 - for this version it will always return true.
if (createSrtHandshake(SRT_CMD_HSRSP, SRT_CMD_KMRSP, kmdata, kmdatasize,
(response), (initdata)))
enterCS(m_ConnectionLock);
bool create_ok = createSrtHandshake(SRT_CMD_HSRSP, SRT_CMD_KMRSP, kmdata, kmdatasize, (response), (initdata));
leaveCS(m_ConnectionLock);
if (create_ok)
{
response.m_iID = m_PeerID;
setPacketTS(response, steady_clock::now());
Expand Down Expand Up @@ -9554,71 +9558,79 @@ bool srt::CUDT::packData(CPacket& w_packet, steady_clock::time_point& w_nexttime

bool srt::CUDT::packUniqueData(CPacket& w_packet)
{
// Check the congestion/flow window limit
const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const int flightspan = getFlightSpan();
if (cwnd <= flightspan)
{
HLOGC(qslog.Debug,
log << CONID() << "packUniqueData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow
<< ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan);
return false;
}

// XXX Here it's needed to set kflg to msgno_bitset in the block stored in the
// send buffer. This should be somehow avoided, the crypto flags should be set
// together with encrypting, and the packet should be sent as is, when rexmitting.
// It would be nice to research as to whether CSndBuffer::Block::m_iMsgNoBitset field
// isn't a useless redundant state copy. If it is, then taking the flags here can be removed.
const int kflg = m_pCryptoControl->getSndCryptoFlags();
int pktskipseqno = 0;
int current_sequence_number; // reflexing variable
int kflg;
time_point tsOrigin;
const int pld_size = m_pSndBuffer->readData((w_packet), (tsOrigin), kflg, (pktskipseqno));
if (pktskipseqno)
{
// Some packets were skipped due to TTL expiry.
m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo, pktskipseqno);
HLOGC(qslog.Debug, log << "packUniqueData: reading skipped " << pktskipseqno << " seq up to %" << m_iSndCurrSeqNo
<< " due to TTL expiry");
}
int pld_size;

if (pld_size == 0)
{
HLOGC(qslog.Debug, log << "packUniqueData: nothing extracted from the buffer");
return false;
}
ScopedLock lkrack (m_RecvAckLock);
// Check the congestion/flow window limit
const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const int flightspan = getFlightSpan();
if (cwnd <= flightspan)
{
HLOGC(qslog.Debug,
log << CONID() << "packUniqueData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow
<< ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan);
return false;
}

// A CHANGE. The sequence number is currently added to the packet
// when scheduling, not when extracting. This is a inter-migration form,
// only override extraction sequence with scheduling sequence in group mode.
m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);
// XXX Here it's needed to set kflg to msgno_bitset in the block stored in the
// send buffer. This should be somehow avoided, the crypto flags should be set
// together with encrypting, and the packet should be sent as is, when rexmitting.
// It would be nice to research as to whether CSndBuffer::Block::m_iMsgNoBitset field
// isn't a useless redundant state copy. If it is, then taking the flags here can be removed.
kflg = m_pCryptoControl->getSndCryptoFlags();
int pktskipseqno = 0;
pld_size = m_pSndBuffer->readData((w_packet), (tsOrigin), kflg, (pktskipseqno));
if (pktskipseqno)
{
// Some packets were skipped due to TTL expiry.
m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo, pktskipseqno);
HLOGC(qslog.Debug, log << "packUniqueData: reading skipped " << pktskipseqno << " seq up to %" << m_iSndCurrSeqNo
<< " due to TTL expiry");
}

if (pld_size == 0)
{
HLOGC(qslog.Debug, log << "packUniqueData: nothing extracted from the buffer");
return false;
}

// A CHANGE. The sequence number is currently added to the packet
// when scheduling, not when extracting. This is a inter-migration form,
// only override extraction sequence with scheduling sequence in group mode.
m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);
current_sequence_number = m_iSndCurrSeqNo;
}

#if ENABLE_BONDING
// Fortunately the group itself isn't being accessed.
if (m_parent->m_GroupOf)
{
const int packetspan = CSeqNo::seqoff(m_iSndCurrSeqNo, w_packet.m_iSeqNo);
const int packetspan = CSeqNo::seqoff(current_sequence_number, w_packet.m_iSeqNo);
if (packetspan > 0)
{
// After increasing by 1, but being previously set as ISN-1, this should be == ISN,
// if this is the very first packet to send.
if (m_iSndCurrSeqNo == m_iISN)
if (current_sequence_number == m_iISN)
{
// This is the very first packet to be sent; so there's nothing in
// the sending buffer yet, and therefore we are in a situation as just
// after connection. No packets in the buffer, no packets are sent,
// no ACK to be awaited. We can screw up all the variables that are
// initialized from ISN just after connection.
LOGC(qslog.Note,
log << CONID() << "packUniqueData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo
log << CONID() << "packUniqueData: Fixing EXTRACTION sequence " << current_sequence_number
<< " from SCHEDULING sequence " << w_packet.m_iSeqNo << " for the first packet: DIFF="
<< packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));
}
else
{
// There will be a serious data discrepancy between the agent and the peer.
LOGC(qslog.Error,
log << CONID() << "IPE: packUniqueData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo
log << CONID() << "IPE: packUniqueData: Fixing EXTRACTION sequence " << current_sequence_number
<< " from SCHEDULING sequence " << w_packet.m_iSeqNo << " in the middle of transition: DIFF="
<< packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));
}
Expand All @@ -9627,7 +9639,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
// won't stupidly request the packets to be retransmitted.
// Don't do it if the difference isn't positive or exceeds the threshold.
int32_t seqpair[2];
seqpair[0] = m_iSndCurrSeqNo;
seqpair[0] = current_sequence_number;
seqpair[1] = CSeqNo::decseq(w_packet.m_iSeqNo);
const int32_t no_msgno = 0;
LOGC(qslog.Debug,
Expand All @@ -9639,8 +9651,8 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
// packet are not present in the buffer (preadte the send buffer).

// Override extraction sequence with scheduling sequence.
m_iSndCurrSeqNo = w_packet.m_iSeqNo;
ScopedLock ackguard(m_RecvAckLock);
m_iSndCurrSeqNo = w_packet.m_iSeqNo;
m_iSndLastAck = w_packet.m_iSeqNo;
m_iSndLastDataAck = w_packet.m_iSeqNo;
m_iSndLastFullAck = w_packet.m_iSeqNo;
Expand All @@ -9650,7 +9662,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
{
LOGC(qslog.Error,
log << CONID() << "IPE: packData: SCHEDULING sequence " << w_packet.m_iSeqNo
<< " is behind of EXTRACTION sequence " << m_iSndCurrSeqNo << ", dropping this packet: DIFF="
<< " is behind of EXTRACTION sequence " << current_sequence_number << ", dropping this packet: DIFF="
<< packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));
// XXX: Probably also change the socket state to broken?
return false;
Expand All @@ -9660,12 +9672,12 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
#endif
{
HLOGC(qslog.Debug,
log << CONID() << "packUniqueData: Applying EXTRACTION sequence " << m_iSndCurrSeqNo
log << CONID() << "packUniqueData: Applying EXTRACTION sequence " << current_sequence_number
<< " over SCHEDULING sequence " << w_packet.m_iSeqNo << " for socket not in group:"
<< " DIFF=" << CSeqNo::seqcmp(m_iSndCurrSeqNo, w_packet.m_iSeqNo)
<< " DIFF=" << CSeqNo::seqcmp(current_sequence_number, w_packet.m_iSeqNo)
<< " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));
// Do this always when not in a group.
w_packet.m_iSeqNo = m_iSndCurrSeqNo;
w_packet.m_iSeqNo = current_sequence_number;
}

// Set missing fields before encrypting the packet, because those fields might be used for encryption.
Expand Down
3 changes: 2 additions & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,7 @@ class CUDT
// require only the lost sequence number, and how to find the packet with this sequence
// will be up to the sending buffer.
sync::atomic<int32_t> m_iSndLastDataAck; // The real last ACK that updates the sender buffer and loss list
SRT_ATTR_GUARDED_BY(m_RecvAckLock)
sync::atomic<int32_t> m_iSndCurrSeqNo; // The largest sequence number that HAS BEEN SENT
sync::atomic<int32_t> m_iSndNextSeqNo; // The sequence number predicted to be placed at the currently scheduled packet

Expand Down Expand Up @@ -975,7 +976,7 @@ class CUDT

mutable sync::Mutex m_RcvBufferLock; // Protects the state of the m_pRcvBuffer
// Protects access to m_iSndCurrSeqNo, m_iSndLastAck
sync::Mutex m_RecvAckLock; // Protects the state changes while processing incoming ACK (SRT_EPOLL_OUT)
mutable sync::Mutex m_RecvAckLock; // Protects the state changes while processing incoming ACK (SRT_EPOLL_OUT)

sync::Condition m_RecvDataCond; // used to block "srt_recv*" when there is no data. Use together with m_RecvLock
sync::Mutex m_RecvLock; // used to synchronize "srt_recv*" call, protects TSBPD drift updates (CRcvBuffer::isRcvDataReady())
Expand Down
4 changes: 4 additions & 0 deletions srtcore/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ class CUniqueSync: public CSync

UniqueLock& locker() { return m_ulock; }

SRT_ATTR_ACQUIRE(this->m_ulock.mutex())
CUniqueSync(Mutex& mut, Condition& cnd)
: CSync(cnd, m_ulock)
, m_ulock(mut)
Expand All @@ -681,6 +682,9 @@ class CUniqueSync: public CSync
{
}

SRT_ATTR_RELEASE(this->m_ulock.mutex())
~CUniqueSync() {}

// These functions can be used safely because
// this whole class guarantees that whatever happens
// while its object exists is that the mutex is locked.
Expand Down