Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Refactored ACK window management to avoid false error reports #1888

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ void srt::CUDT::clearData()
// XXX use some constant for this 16
m_iDeliveryRate = 16;
m_iByteDeliveryRate = 16 * m_iMaxSRTPayloadSize;
m_iAckSeqNo = 0;
m_iAckJournal = 0;
m_tsLastAckTime = steady_clock::now();

// trace information
Expand Down Expand Up @@ -7842,7 +7842,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
// than sequence number (it's a "journal" for ACK request-response,
// and starts from 0, unlike sequence, which starts from a random
// number), but still the numbers are from exactly the same domain.
m_iAckSeqNo = CAckNo::incack(m_iAckSeqNo);
m_iAckJournal = CAckNo::incack(m_iAckJournal);
data[ACKD_RCVLASTACK] = m_iRcvLastAck;
data[ACKD_RTT] = m_iSRTT;
data[ACKD_RTTVAR] = m_iRTTVar;
Expand Down Expand Up @@ -7874,20 +7874,20 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
}
// ELSE: leave the buffer with ...UDTBASE size.

ctrlpkt.pack(UMSG_ACK, &m_iAckSeqNo, data, ctrlsz);
ctrlpkt.pack(UMSG_ACK, &m_iAckJournal, data, ctrlsz);
m_tsLastAckTime = steady_clock::now();
}
else
{
ctrlpkt.pack(UMSG_ACK, &m_iAckSeqNo, data, ACKD_FIELD_SIZE * ACKD_TOTAL_SIZE_SMALL);
ctrlpkt.pack(UMSG_ACK, &m_iAckJournal, data, ACKD_FIELD_SIZE * ACKD_TOTAL_SIZE_SMALL);
}

ctrlpkt.m_iID = m_PeerID;
setPacketTS(ctrlpkt, steady_clock::now());
nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt);
DebugAck("sendCtrl(UMSG_ACK): " + CONID(), local_prevack, ack);

m_ACKWindow.store(m_iAckSeqNo, m_iRcvLastAck);
m_ACKWindow.store(m_iAckJournal, m_iRcvLastAck);

enterCS(m_StatsLock);
++m_stats.sentACK;
Expand Down Expand Up @@ -8251,23 +8251,20 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr
int32_t ack = 0;

// Calculate RTT estimate on the receiver side based on ACK/ACKACK pair.
const int rtt = m_ACKWindow.acknowledge(ctrlpkt.getAckSeqNo(), ack, tsArrival);

if (rtt == -1)
int rtt = 0;
const ACKWindow::Status astat = m_ACKWindow.acknowledge(ctrlpkt.getAckSeqNo(), tsArrival, (ack), (rtt));
if (astat != ACKWindow::OK)
{
if (ctrlpkt.getAckSeqNo() > (m_iAckSeqNo - static_cast<int>(ACK_WND_SIZE)) && ctrlpkt.getAckSeqNo() <= m_iAckSeqNo)
if (astat != ACKWindow::OLD) // ignore old - can't measure, but that's not a problem
{
LOGC(inlog.Warn,
log << CONID() << "ACKACK out of order, skipping RTT calculation "
<< "(ACK number: " << ctrlpkt.getAckSeqNo() << ", last ACK sent: " << m_iAckSeqNo
<< ", RTT (EWMA): " << m_iSRTT << ")");
return;
// For WIPED and ROGUE report this by an error log; this is an unwanted situation.
LOGC(inlog.Error,
log << CONID() << "IPE/EPE: ACK node overwritten when acknowledging "
<< ctrlpkt.getAckSeqNo()
<< (astat == ACKWindow::WIPED ? ": No such node recorded when ACKing"
: ": ROGUE journal value"));
}

LOGC(inlog.Error,
log << CONID() << "IPE: ACK record not found, can't estimate RTT "
<< "(ACK number: " << ctrlpkt.getAckSeqNo() << ", last ACK sent: " << m_iAckSeqNo
<< ", RTT (EWMA): " << m_iSRTT << ")");
return;
}

Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ class CUDT
#endif
int32_t m_iRcvLastSkipAck; // Last dropped sequence ACK
int32_t m_iRcvLastAckAck; // Last sent ACK that has been acknowledged
int32_t m_iAckSeqNo; // Last ACK sequence number
int32_t m_iAckJournal; // Last ACK sequence number
srt::sync::atomic<int32_t> m_iRcvCurrSeqNo; // Largest received sequence number
int32_t m_iRcvCurrPhySeqNo; // Same as m_iRcvCurrSeqNo, but physical only (disregarding a filter)

Expand Down
165 changes: 145 additions & 20 deletions srtcore/window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,83 +61,208 @@ modified by
using namespace std;
using namespace srt::sync;

namespace ACKWindowTools
namespace ACKWindow
{

void store(Seq* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t seq, int32_t ack)
void store(AckNode* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t jrn, int32_t ackseq)
{
r_aSeq[r_iHead].iACKSeqNo = seq;
r_aSeq[r_iHead].iACK = ack;
r_aSeq[r_iHead].tsTimeStamp = steady_clock::now();
r_aSeq[r_iHead].iJournal = jrn;
r_aSeq[r_iHead].iAckSeq = ackseq;

r_iHead = (r_iHead + 1) % size;

// overwrite the oldest ACK since it is not likely to be acknowledged
// Overwrite the oldest ACK since it is not likely to be acknowledged.
// Eat your own tail.
if (r_iHead == r_iTail)
r_iTail = (r_iTail + 1) % size;
}

int acknowledge(Seq* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t seq, int32_t& r_ack, const steady_clock::time_point& currtime)
struct Range
{
int begin, end;
};

struct FIsJournal
{
int32_t jrn;
FIsJournal(int32_t v): jrn(v) {}

bool operator()(const AckNode& node) const
{
return node.iJournal == jrn;
}
};

Status acknowledge(AckNode* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t jrn, const steady_clock::time_point& currtime, int32_t& w_ack, int32_t& w_timediff)
{
Range range1, range2;
range1.begin = r_iTail;
range2.begin = 0;
if (r_iHead < r_iTail)
{
// range2: [0 ... r_iHead] ... range1:[r_iTail ... end]
range1.end = size;
range2.end = r_iHead;
}
else
{
// [0 ... r_iTail-1] range1:[r_iTail ... r_iHead] [... end], range2: [0-0] (empty)
range1.end = r_iHead;
range2.end = 0;
}

// Here we are certain that the range1 is contiguous and nonempty
// Contiguous is by extracting two contiguous ranges in case when the
// original range was non-contiguous.
// Emptiness is checked here:
if (range1.begin == range1.end)
{
// This can be as well rogue, but with empty
// container it would cost a lot of checks to
// confirm that it was the case, not worth a shot.
return OLD;
}

// Check the first range.
// The first range is always "older" than the second range, if the second one exists.
if (CSeqNo::seqcmp(jrn, r_aSeq[range1.begin].iJournal) < 0)
{
return OLD;
}

int found = -1;

if (CSeqNo::seqcmp(jrn, r_aSeq[range1.end-1].iJournal) <= 0)
{
// We have the value within this range, check if exists.
AckNode* pos = std::find_if(r_aSeq + range1.begin, r_aSeq + range1.end, FIsJournal(jrn));
if (pos == r_aSeq + range1.end)
return WIPED;

found = pos - r_aSeq;
}
else
{
// Not within the first range, check the second range.
// If second range is empty, report this as ROGUE.
if (range2.begin == range2.end)
{
return ROGUE;
}

if (CSeqNo::seqcmp(jrn, r_aSeq[range2.begin].iJournal < 0))
{
// The value is above range1, but below range2. Hence, not found.
return WIPED;
}

if (CSeqNo::seqcmp(jrn, r_aSeq[range2.end-1].iJournal) <= 0)
{
// We have the value within this range, check if exists.
AckNode* pos = std::find_if(r_aSeq + range2.begin, r_aSeq + range2.end, FIsJournal(jrn));
if (pos == r_aSeq + range1.end)
return WIPED;
found = pos - r_aSeq;
}
else
{
// ABOVE range2 - ROGUE
return ROGUE;
}
}

// As long as none of the above did abnormal termination by early return,
// pos contains our required node.
w_ack = r_aSeq[found].iAckSeq;
w_timediff = count_microseconds(currtime - r_aSeq[found].tsTimeStamp);

int inext = found + 1;
if (inext == r_iHead)
{
// Clear the container completely.
r_iHead = 0;
r_iTail = 0;
r_aSeq[0].iJournal = SRT_SEQNO_NONE;
r_aSeq[0].iAckSeq = SRT_SEQNO_NONE;
r_aSeq[0].tsTimeStamp = steady_clock::time_point();
}
else
{
// Just cut the tail.
if (inext == int(size))
{
inext = 0;
}
r_iTail = inext;
// Keep r_iHead in existing position.
}

return OK;
}

/* Updated old version remains for historical reasons
Status old_acknowledge(AckNode* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t jrn, int32_t& w_ack, const steady_clock::time_point& currtime, int32_t& w_timediff)
{
// Head has not exceeded the physical boundary of the window
if (r_iHead >= r_iTail)
{
for (int i = r_iTail, n = r_iHead; i < n; ++ i)
{
// Looking for an identical ACK Seq. No.
if (seq == r_aSeq[i].iACKSeqNo)
// looking for an identical ACK AckNode. No.
if (jrn == r_aSeq[i].iJournal)
{
// Return the Data ACK it carried
r_ack = r_aSeq[i].iACK;
w_ack = r_aSeq[i].iAckSeq;

// Calculate RTT estimate
const int rtt = count_microseconds(currtime - r_aSeq[i].tsTimeStamp);
w_timediff = count_microseconds(currtime - r_aSeq[i].tsTimeStamp);

if (i + 1 == r_iHead)
{
r_iTail = r_iHead = 0;
r_aSeq[0].iACKSeqNo = SRT_SEQNO_NONE;
r_aSeq[0].iJournal = SRT_SEQNO_NONE;
}
else
r_iTail = (i + 1) % size;

return rtt;
return OK;
}
}

// The record about ACK is not found in the buffer, RTT can not be calculated
return -1;
return ROGUE;
}

// Head has exceeded the physical window boundary, so it is behind tail
for (int j = r_iTail, n = r_iHead + size; j < n; ++ j)
{
// Looking for an identical ACK Seq. No.
if (seq == r_aSeq[j % size].iACKSeqNo)
if (jrn == r_aSeq[j % size].iJournal)
{
// Return the Data ACK it carried
j %= size;
r_ack = r_aSeq[j].iACK;
w_ack = r_aSeq[j].iAckSeq;

// Calculate RTT estimate
const int rtt = count_microseconds(currtime - r_aSeq[j].tsTimeStamp);
w_timediff = count_microseconds(currtime - r_aSeq[j].tsTimeStamp);

if (j == r_iHead)
{
r_iTail = r_iHead = 0;
r_aSeq[0].iACKSeqNo = -1;
r_aSeq[0].iJournal = -1;
}
else
r_iTail = (j + 1) % size;

return rtt;
return OK;
}
}

// The record about ACK is not found in the buffer, RTT can not be calculated
return -1;
return ROGUE;
}

*/
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading