From bfd4033ba2bede1ffdf4b00b78351bf95083c94e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 24 Mar 2021 17:36:09 +0100 Subject: [PATCH 1/4] [core] Refactored ACK window management to avoid false error reports --- srtcore/core.cpp | 30 ++++--- srtcore/core.h | 2 +- srtcore/window.cpp | 168 +++++++++++++++++++++++++++++++++++----- srtcore/window.h | 50 ++++++++---- test/test_utilities.cpp | 122 +++++++++++++++++++++++++++++ 5 files changed, 327 insertions(+), 45 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index c31b7088a..620592d76 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -809,7 +809,7 @@ void CUDT::clearData() // XXX use some constant for this 16 m_iDeliveryRate = 16; m_iByteDeliveryRate = 16 * m_iMaxSRTPayloadSize; - m_iAckSeqNo = 0; + m_iAckJournal = 0; m_tsLastAckTime = steady_clock::now(); // trace information @@ -7679,7 +7679,7 @@ int CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) // than sequence number (it's a "journal" for ACK request-response, // and starts from 0, unlike sequence, which starts from a random // number), but still the numbers are from exactly the same domain. - m_iAckSeqNo = CAckNo::incack(m_iAckSeqNo); + m_iAckJournal = CAckNo::incack(m_iAckJournal); data[ACKD_RCVLASTACK] = m_iRcvLastAck; data[ACKD_RTT] = m_iRTT; data[ACKD_RTTVAR] = m_iRTTVar; @@ -7711,12 +7711,12 @@ int CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) } // ELSE: leave the buffer with ...UDTBASE size. - ctrlpkt.pack(UMSG_ACK, &m_iAckSeqNo, data, ctrlsz); + ctrlpkt.pack(UMSG_ACK, &m_iAckJournal, data, ctrlsz); m_tsLastAckTime = steady_clock::now(); } else { - ctrlpkt.pack(UMSG_ACK, &m_iAckSeqNo, data, ACKD_FIELD_SIZE * ACKD_TOTAL_SIZE_SMALL); + ctrlpkt.pack(UMSG_ACK, &m_iAckJournal, data, ACKD_FIELD_SIZE * ACKD_TOTAL_SIZE_SMALL); } ctrlpkt.m_iID = m_PeerID; @@ -7724,7 +7724,7 @@ int CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt); DebugAck("sendCtrl(UMSG_ACK): " + CONID(), local_prevack, ack); - m_ACKWindow.store(m_iAckSeqNo, m_iRcvLastAck); + m_ACKWindow.store(m_iAckJournal, m_iRcvLastAck); enterCS(m_StatsLock); ++m_stats.sentACK; @@ -8198,13 +8198,21 @@ void CUDT::processCtrl(const CPacket &ctrlpkt) case UMSG_ACKACK: // 110 - Acknowledgement of Acknowledgement { int32_t ack = 0; - const int rtt = m_ACKWindow.acknowledge(ctrlpkt.getAckSeqNo(), ack); - if (rtt <= 0) + int rtt = 0; + ACKWindow::Status astat = m_ACKWindow.acknowledge(ctrlpkt.getAckSeqNo(), (ack), (rtt)); + if (astat != ACKWindow::OK) { - LOGC(inlog.Error, - log << CONID() << "IPE: ACK node overwritten when acknowledging " << ctrlpkt.getAckSeqNo() - << " (ack extracted: " << ack << ")"); - break; + if (astat != ACKWindow::OLD) // ignore old - can't measure, but that's not a problem + { + // For WIPED and ROGUE report this by an error log; this is an unwanted situation. + LOGC(inlog.Error, + log << CONID() << "IPE/EPE: ACK node overwritten when acknowledging " + << ctrlpkt.getAckSeqNo() + << (astat == ACKWindow::WIPED ? ": No such node recorded when ACKing" + : ": ROGUE journal value")); + } + + break; // Do not measure RTT because the previous node cannot be found. } // if increasing delay detected... diff --git a/srtcore/core.h b/srtcore/core.h index 0a4c2d86d..c3f8780b8 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -859,7 +859,7 @@ class CUDT #endif int32_t m_iRcvLastSkipAck; // Last dropped sequence ACK int32_t m_iRcvLastAckAck; // Last sent ACK that has been acknowledged - int32_t m_iAckSeqNo; // Last ACK sequence number + int32_t m_iAckJournal; // Last ACK sequence number int32_t m_iRcvCurrSeqNo; // Largest received sequence number int32_t m_iRcvCurrPhySeqNo; // Same as m_iRcvCurrSeqNo, but physical only (disregarding a filter) diff --git a/srtcore/window.cpp b/srtcore/window.cpp index 0c40af128..ba39965d5 100644 --- a/srtcore/window.cpp +++ b/srtcore/window.cpp @@ -61,23 +61,150 @@ modified by using namespace std; using namespace srt::sync; -namespace ACKWindowTools +namespace ACKWindow { -void store(Seq* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t seq, int32_t ack) +void store(AckNode* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t jrn, int32_t ackseq) { - r_aSeq[r_iHead].iACKSeqNo = seq; - r_aSeq[r_iHead].iACK = ack; r_aSeq[r_iHead].tsTimeStamp = steady_clock::now(); + r_aSeq[r_iHead].iJournal = jrn; + r_aSeq[r_iHead].iAckSeq = ackseq; r_iHead = (r_iHead + 1) % size; - // overwrite the oldest ACK since it is not likely to be acknowledged + // Overwrite the oldest ACK since it is not likely to be acknowledged. + // Eat your own tail. if (r_iHead == r_iTail) r_iTail = (r_iTail + 1) % size; } -int acknowledge(Seq* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t seq, int32_t& r_ack) +struct Range +{ + int begin, end; +}; + +struct FIsJournal +{ + int32_t jrn; + FIsJournal(int32_t v): jrn(v) {} + + bool operator()(const AckNode& node) const + { + return node.iJournal == jrn; + } +}; + +Status acknowledge(AckNode* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t jrn, int32_t& w_ack, int32_t& w_timediff) +{ + steady_clock::time_point now = steady_clock::now(); + + Range range1, range2; + range1.begin = r_iTail; + range2.begin = 0; + if (r_iHead < r_iTail) + { + // range2: [0 ... r_iHead] ... range1:[r_iTail ... end] + range1.end = size; + range2.end = r_iHead; + } + else + { + // [0 ... r_iTail-1] range1:[r_iTail ... r_iHead] [... end], range2: [0-0] (empty) + range1.end = r_iHead; + range2.end = 0; + } + + // Here we are certain that the range1 is contiguous and nonempty + // Continuous is by extracting two contiguous ranges in case when the + // original range was non-contiguous. + // Emptiness is checked here: + if (range1.begin == range1.end) + { + // This can be as well rogue, but with empty + // container it would cost a lot of checks to + // confirm that it was the case, not worth a shot. + return OLD; + } + + // Check the first range. + // The first range is always "older" than the second range, if the second one exists. + if (CSeqNo::seqcmp(jrn, r_aSeq[range1.begin].iJournal) < 0) + { + return OLD; + } + + int found = -1; + + if (CSeqNo::seqcmp(jrn, r_aSeq[range1.end-1].iJournal) <= 0) + { + // We have the value within this range, check if exists. + AckNode* pos = std::find_if(r_aSeq + range1.begin, r_aSeq + range1.end, FIsJournal(jrn)); + if (pos == r_aSeq + range1.end) + return WIPED; + + found = pos - r_aSeq; + } + else + { + // Not within the first range, check the second range. + // If second range is empty, report this as ROGUE. + if (range2.begin == range2.end) + { + return ROGUE; + } + + if (CSeqNo::seqcmp(jrn, r_aSeq[range2.begin].iJournal < 0)) + { + // The value is above range1, but below range2. Hence, not found. + return WIPED; + } + + if (CSeqNo::seqcmp(jrn, r_aSeq[range2.end-1].iJournal) <= 0) + { + // We have the value within this range, check if exists. + AckNode* pos = std::find_if(r_aSeq + range2.begin, r_aSeq + range2.end, FIsJournal(jrn)); + if (pos == r_aSeq + range1.end) + return WIPED; + found = pos - r_aSeq; + } + else + { + // ABOVE range2 - ROGUE + return ROGUE; + } + } + + // As long as none of the above did abnormal termination by early return, + // pos contains our required node. + w_ack = r_aSeq[found].iAckSeq; + w_timediff = count_microseconds(steady_clock::now() - r_aSeq[found].tsTimeStamp); + + int inext = found + 1; + if (inext == r_iHead) + { + // Clear the container completely. + r_iHead = 0; + r_iTail = 0; + r_aSeq[0].iJournal = SRT_SEQNO_NONE; + r_aSeq[0].iAckSeq = SRT_SEQNO_NONE; + r_aSeq[0].tsTimeStamp = steady_clock::time_point(); + } + else + { + // Just cut the tail. + if (inext == int(size)) + { + inext = 0; + } + r_iTail = inext; + // Keep r_iHead in existing position. + } + + return OK; +} + +/* Updated old version remains for historical reasons +Status old_acknowledge(AckNode* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t jrn, int32_t& w_ack, int32_t& w_timediff) { if (r_iHead >= r_iTail) { @@ -85,59 +212,60 @@ int acknowledge(Seq* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int3 for (int i = r_iTail, n = r_iHead; i < n; ++ i) { - // looking for identical ACK Seq. No. - if (seq == r_aSeq[i].iACKSeqNo) + // looking for identical ACK AckNode. No. + if (jrn == r_aSeq[i].iJournal) { // return the Data ACK it carried - r_ack = r_aSeq[i].iACK; + w_ack = r_aSeq[i].iAckSeq; // calculate RTT - const int rtt = count_microseconds(steady_clock::now() - r_aSeq[i].tsTimeStamp); + w_timediff = count_microseconds(steady_clock::now() - r_aSeq[i].tsTimeStamp); if (i + 1 == r_iHead) { r_iTail = r_iHead = 0; - r_aSeq[0].iACKSeqNo = SRT_SEQNO_NONE; + r_aSeq[0].iJournal = SRT_SEQNO_NONE; } else r_iTail = (i + 1) % size; - return rtt; + return OK; } } // Bad input, the ACK node has been overwritten - return -1; + return ROGUE; } // Head has exceeded the physical window boundary, so it is behind tail for (int j = r_iTail, n = r_iHead + size; j < n; ++ j) { - // looking for indentical ACK seq. no. - if (seq == r_aSeq[j % size].iACKSeqNo) + // looking for indentical ACK jrn. no. + if (jrn == r_aSeq[j % size].iJournal) { // return Data ACK j %= size; - r_ack = r_aSeq[j].iACK; + w_ack = r_aSeq[j].iAckSeq; // calculate RTT - const int rtt = count_microseconds(steady_clock::now() - r_aSeq[j].tsTimeStamp); + w_timediff = count_microseconds(steady_clock::now() - r_aSeq[j].tsTimeStamp); if (j == r_iHead) { r_iTail = r_iHead = 0; - r_aSeq[0].iACKSeqNo = -1; + r_aSeq[0].iJournal = -1; } else r_iTail = (j + 1) % size; - return rtt; + return OK; } } // bad input, the ACK node has been overwritten - return -1; + return ROGUE; } +*/ } //////////////////////////////////////////////////////////////////////////////// diff --git a/srtcore/window.h b/srtcore/window.h index b3baf0d60..407503b37 100644 --- a/srtcore/window.h +++ b/srtcore/window.h @@ -61,17 +61,28 @@ modified by #include "udt.h" #include "packet.h" -namespace ACKWindowTools +namespace ACKWindow { - struct Seq + struct AckNode { - int32_t iACKSeqNo; // Seq. No. for the ACK packet - int32_t iACK; // Data Seq. No. carried by the ACK packet + int32_t iJournal; // AckNode. No. for the ACK packet + int32_t iAckSeq; // Data AckNode. No. carried by the ACK packet srt::sync::steady_clock::time_point tsTimeStamp; // The timestamp when the ACK was sent }; - void store(Seq* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t seq, int32_t ack); - int acknowledge(Seq* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t seq, int32_t& r_ack); + enum Status + { + OK, //< Node found and removed, together with all older nodes + OLD, //< Given node is in the 1/2 of the sequence cycle before the oldest node + ROGUE, //< Given node is in the 1/2 of the sequence cycle after the newest node + WIPED //< Given node is within the range of contained nodes, but wasn't found + }; + + void store(AckNode* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t seq, int32_t ack); + + Status acknowledge(AckNode* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t seq, int32_t& r_ack, int& w_timediff); + Status old_acknowledge(AckNode* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int32_t seq, int32_t& r_ack, int& w_timediff); + } template @@ -83,7 +94,7 @@ class CACKWindow m_iHead(0), m_iTail(0) { - m_aSeq[0].iACKSeqNo = SRT_SEQNO_NONE; + m_aSeq[0].iJournal = SRT_SEQNO_NONE; } ~CACKWindow() {} @@ -92,9 +103,9 @@ class CACKWindow /// @param [in] seq ACK seq. no. /// @param [in] ack DATA ACK no. - void store(int32_t seq, int32_t ack) + void store(int32_t jrn, int32_t ackseq) { - return ACKWindowTools::store(m_aSeq, SIZE, m_iHead, m_iTail, seq, ack); + return ACKWindow::store(m_aSeq, SIZE, m_iHead, m_iTail, jrn, ackseq); } /// Search the ACK-2 "seq" in the window, find out the DATA "ack" and caluclate RTT . @@ -102,16 +113,29 @@ class CACKWindow /// @param [out] ack the DATA ACK no. that matches the ACK-2 no. /// @return RTT. - int acknowledge(int32_t seq, int32_t& r_ack) + ACKWindow::Status acknowledge(int32_t jrn, int32_t& w_ackseq, int32_t& w_timediff) { - return ACKWindowTools::acknowledge(m_aSeq, SIZE, m_iHead, m_iTail, seq, r_ack); + return ACKWindow::acknowledge(m_aSeq, SIZE, m_iHead, m_iTail, jrn, (w_ackseq), (w_timediff)); } + /* + ACKWindow::Status old_acknowledge(int32_t jrn, int32_t& w_ackseq, int32_t& w_timediff) + { + return ACKWindow::old_acknowledge(m_aSeq, SIZE, m_iHead, m_iTail, jrn, (w_ackseq), (w_timediff)); + } + unblock for testing + */ + + // For UT purposes + ACKWindow::AckNode first() { return m_aSeq[m_iTail]; } + ACKWindow::AckNode last() { return m_aSeq[(m_iHead - 1 + SIZE) % SIZE]; } + size_t size() { return (m_iHead - m_iTail + SIZE) % SIZE; } + private: - typedef ACKWindowTools::Seq Seq; + typedef ACKWindow::AckNode AckNode; - Seq m_aSeq[SIZE]; + AckNode m_aSeq[SIZE]; int m_iHead; // Pointer to the lastest ACK record int m_iTail; // Pointer to the oldest ACK record diff --git a/test/test_utilities.cpp b/test/test_utilities.cpp index 7deabe736..715511175 100644 --- a/test/test_utilities.cpp +++ b/test/test_utilities.cpp @@ -7,6 +7,7 @@ #define SRT_TEST_CIRCULAR_BUFFER #include "api.h" #include "common.h" +#include "window.h" using namespace std; @@ -271,3 +272,124 @@ TEST(ConfigString, Setting) EXPECT_EQ(s.size(), 0); EXPECT_TRUE(s.empty()); } + +struct AckData +{ + int32_t journal; + int32_t ackseq; +}; + +static void TestAckWindow(const std::array& data, size_t initpos, const std::string& casename) +{ + typedef CACKWindow<10> ackwindow_t; + ackwindow_t ackwindow; + + int b4 = data[0].journal - initpos; + + for (size_t i = 0; i < initpos; ++i) + { + ackwindow.store(b4, 0); + ++b4; + } + + for (auto& n: data) + ackwindow.store(n.journal, n.ackseq); + + // Now remove those initial ones + int32_t dummy1, dummy2; + ackwindow.acknowledge(data[0].journal-1, (dummy1), (dummy2)); + + ASSERT_EQ(ackwindow.first().iJournal, data[0].journal) << " (" << casename << ")"; + ASSERT_EQ(ackwindow.last().iJournal, data[4].journal) << " (" << casename << ")"; + ASSERT_EQ(ackwindow.size(), 5) << " (" << casename << ")"; + + int iack = 0; + int td =0; + + // Remove oldest node. Should go ok. + ACKWindow::Status stat = ackwindow.acknowledge(data[0].journal, (iack), (td)); + EXPECT_EQ(iack, data[0].ackseq) << " (" << casename << ")"; + EXPECT_EQ(stat, ACKWindow::OK) << " (" << casename << ")"; + EXPECT_EQ(ackwindow.size(), 4) << " (" << casename << ")"; + EXPECT_EQ(ackwindow.first().iJournal, data[1].journal) << " (" << casename << ")"; + + // Now remove the node +2 + stat = ackwindow.acknowledge(data[2].journal, (iack), (td)); + EXPECT_EQ(iack, data[2].ackseq) << " (" << casename << ")"; + EXPECT_EQ(stat, ACKWindow::OK) << " (" << casename << ")"; + EXPECT_EQ(ackwindow.size(), 2) << " (" << casename << ")"; + EXPECT_EQ(ackwindow.first().iJournal, data[3].journal) << " (" << casename << ")"; + + // Now remove too old node + stat = ackwindow.acknowledge(data[1].journal, (iack), (td)); + EXPECT_EQ(stat, ACKWindow::OLD) << "(" << casename << ")"; + // Like above - no changes were expected + EXPECT_EQ(ackwindow.size(), 2) << " (" << casename << ")"; + EXPECT_EQ(ackwindow.first().iJournal, data[3].journal) << " (" << casename << ")"; + + // And remove the node that wasn't inserted + int32_t wrongnode = data[4].journal+1; + stat = ackwindow.acknowledge(wrongnode, (iack), (td)); + EXPECT_EQ(stat, ACKWindow::ROGUE); + // Like above - no changes were expected + EXPECT_EQ(ackwindow.size(), 2) << " (" << casename << ")"; + EXPECT_EQ(ackwindow.first().iJournal, data[3].journal) << " (" << casename << ")"; + + // Now insert one value that jumps over. It's not exactly + // possible in the normal SRT runtime, but the reaction should be + // prepared just in case. + ackwindow.store(data[4].journal+2, data[4].ackseq); + // Now a search of data[4].journal+1 should fail appropriately. + stat = ackwindow.acknowledge(data[4].journal+1, (iack), (td)); + EXPECT_EQ(stat, ACKWindow::WIPED); +} + +TEST(ACKWindow, API) +{ + + // We have a situation with circular buffer with circular + // numbers with two different cirtulations. We need then + // permutations of 4 special plus 1 regular, in total: + // + // 1. Regular numbers in a regular range + // 2. Regular numbers in a split range + // 3. Number overflow in a regular range. + // 4. Number ovewflow in a split range in lower part + // 5. Number overflow in a split range in upper part + + int32_t seq0 = CSeqNo::m_iSeqNoTH; + + int32_t basej = 100; + std::array regular = { + AckData {basej + 0, seq0}, + AckData {basej + 1, seq0 + 10}, + AckData {basej + 2, seq0 + 20}, + AckData {basej + 3, seq0 + 30}, + AckData {basej + 4, seq0 + 40} + }; + + // 1. + TestAckWindow(regular, 0, "regular/0"); + + // 2. + TestAckWindow(regular, 7, "regular/7"); + + basej = CSeqNo::m_iMaxSeqNo-2; + + std::array overflow = { + AckData {basej + 0, seq0}, + AckData {basej + 1, seq0 + 10}, + AckData {basej + 2, seq0 + 20}, + AckData {basej + 3, seq0 + 30}, + AckData {basej + 4, seq0 + 40} + }; + + // 3. + TestAckWindow(overflow, 0, "overflow/0"); + + // 4. + TestAckWindow(overflow, 3, "overflow/3"); + + // 5. + TestAckWindow(overflow, 7, "overflow/7"); +} From 7b57995979d1fb926cdee9987e5cc3a272ddbd81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 24 Mar 2021 17:40:05 +0100 Subject: [PATCH 2/4] Use correctly early read time --- srtcore/window.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srtcore/window.cpp b/srtcore/window.cpp index ba39965d5..7e4438819 100644 --- a/srtcore/window.cpp +++ b/srtcore/window.cpp @@ -177,7 +177,7 @@ Status acknowledge(AckNode* r_aSeq, const size_t size, int& r_iHead, int& r_iTai // As long as none of the above did abnormal termination by early return, // pos contains our required node. w_ack = r_aSeq[found].iAckSeq; - w_timediff = count_microseconds(steady_clock::now() - r_aSeq[found].tsTimeStamp); + w_timediff = count_microseconds(now - r_aSeq[found].tsTimeStamp); int inext = found + 1; if (inext == r_iHead) From 58990ccba0545a4b9f2d3ff13d67670f800fe45b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 25 Mar 2021 09:45:52 +0100 Subject: [PATCH 3/4] Added lacking array include --- test/test_utilities.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/test/test_utilities.cpp b/test/test_utilities.cpp index 715511175..1effc8bd6 100644 --- a/test/test_utilities.cpp +++ b/test/test_utilities.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include From cc7755ce9ed4cad91397a82cc6d6c80fd57074c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Mon, 29 Mar 2021 13:00:29 +0200 Subject: [PATCH 4/4] CodeFactor --- test/test_utilities.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/test/test_utilities.cpp b/test/test_utilities.cpp index 1effc8bd6..f0d5011f2 100644 --- a/test/test_utilities.cpp +++ b/test/test_utilities.cpp @@ -347,7 +347,6 @@ static void TestAckWindow(const std::array& data, size_t initpos, co TEST(ACKWindow, API) { - // We have a situation with circular buffer with circular // numbers with two different cirtulations. We need then // permutations of 4 special plus 1 regular, in total: