From ce98b8ac53d69cd2b7df8e5f8b8724c1df988594 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Fri, 23 Dec 2022 12:50:42 +0100 Subject: [PATCH 1/3] [MAINT] Refax for CSndBuffer::readData to make the call form clearer --- srtcore/buffer_snd.cpp | 50 +++++++++++++++++++++++++++--------------- srtcore/buffer_snd.h | 25 ++++++++++++++++++--- srtcore/core.cpp | 25 ++++++++++----------- 3 files changed, 65 insertions(+), 35 deletions(-) diff --git a/srtcore/buffer_snd.cpp b/srtcore/buffer_snd.cpp index e945b166c..d63f97519 100644 --- a/srtcore/buffer_snd.cpp +++ b/srtcore/buffer_snd.cpp @@ -421,9 +421,10 @@ int32_t CSndBuffer::getMsgNoAt(const int offset) return p->getMsgSeq(); } -int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time_point& w_srctime, int& w_msglen) +int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time_point& w_srctime, Drop& w_drop) { - int32_t& msgno_bitset = w_packet.m_iMsgNo; + // NOTE: w_packet.m_iSeqNo is expected to be set to the value + // of the sequence number with which this packet should be sent. ScopedLock bufferguard(m_BufLock); @@ -438,19 +439,23 @@ int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time if (p == m_pLastBlock) { LOGC(qslog.Error, log << "CSndBuffer::readData: offset " << offset << " too large!"); - return 0; + return READ_NONE; } #if ENABLE_HEAVY_LOGGING const int32_t first_seq = p->m_iSeqNo; int32_t last_seq = p->m_iSeqNo; #endif + // This is rexmit request, so the packet should have the sequence number + // already set when it was once sent uniquely. + SRT_ASSERT(p->m_iSeqNo == w_packet.m_iSeqNo); + // Check if the block that is the next candidate to send (m_pCurrBlock pointing) is stale. // If so, then inform the caller that it should first take care of the whole // message (all blocks with that message id). Shift the m_pCurrBlock pointer // to the position past the last of them. Then return -1 and set the - // msgno_bitset return reference to the message id that should be dropped as + // msgno bitset packet field to the message id that should be dropped as // a whole. // After taking care of that, the caller should immediately call this function again, @@ -462,11 +467,11 @@ int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time if ((p->m_iTTL >= 0) && (count_milliseconds(steady_clock::now() - p->m_tsOriginTime) > p->m_iTTL)) { - int32_t msgno = p->getMsgSeq(); - w_msglen = 1; - p = p->m_pNext; - bool move = false; - while (p != m_pLastBlock && msgno == p->getMsgSeq()) + w_drop.msgno = p->getMsgSeq(); + int msglen = 1; + p = p->m_pNext; + bool move = false; + while (p != m_pLastBlock && w_drop.msgno == p->getMsgSeq()) { #if ENABLE_HEAVY_LOGGING last_seq = p->m_iSeqNo; @@ -476,18 +481,27 @@ int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time p = p->m_pNext; if (move) m_pCurrBlock = p; - w_msglen++; + msglen++; } HLOGC(qslog.Debug, - log << "CSndBuffer::readData: due to TTL exceeded, SEQ " << first_seq << " - " << last_seq << ", " - << w_msglen << " packets to drop, msgno=" << msgno); - - // If readData returns -1, then msgno_bitset is understood as a Message ID to drop. - // This means that in this case it should be written by the message sequence value only - // (not the whole 4-byte bitset written at PH_MSGNO). - msgno_bitset = msgno; - return -1; + log << "CSndBuffer::readData: due to TTL exceeded, %(" << first_seq << " - " << last_seq << "), " + << msglen << " packets to drop with #" << w_drop.msgno); + + // Theoretically as the seq numbers are being tracked, you should be able + // to simply take the sequence number from the block. But this is a new + // feature and should be only used after refax for the sender buffer to + // make it manage the sequence numbers inside, instead of by CUDT::m_iSndLastDataAck. + w_drop.seqno[Drop::BEGIN] = w_packet.m_iSeqNo; + w_drop.seqno[Drop::END] = CSeqNo::incseq(w_packet.m_iSeqNo, msglen - 1); + + // Note the rules: here `p` is pointing to the first block AFTER the + // message to be dropped, so the end sequence should be one behind + // the one for p. Note that the loop rolls until hitting the first + // packet that doesn't belong to the message or m_pLastBlock, which + // is past-the-end for the occupied range in the sender buffer. + SRT_ASSERT(w_drop.seqno[Drop::END] == CSeqNo::decseq(p->m_iSeqNo)); + return READ_DROP; } w_packet.m_pcData = p->m_pcData; diff --git a/srtcore/buffer_snd.h b/srtcore/buffer_snd.h index 5dce96ae0..d0dcdd453 100644 --- a/srtcore/buffer_snd.h +++ b/srtcore/buffer_snd.h @@ -116,6 +116,10 @@ class CSndBuffer SRT_ATTR_EXCLUDES(m_BufLock) int addBufferFromFile(std::fstream& ifs, int len); + // Special values that can be returned by readData. + static const int READ_NONE = 0; + static const int READ_DROP = -1; + /// Find data position to pack a DATA packet from the furthest reading point. /// @param [out] packet the packet to read. /// @param [out] origintime origin time stamp of the message @@ -130,14 +134,29 @@ class CSndBuffer SRT_ATTR_EXCLUDES(m_BufLock) time_point peekNextOriginal() const; + struct Drop + { + static const size_t BEGIN = 0, END = 1; + int32_t seqno[2]; + int32_t msgno; + }; /// Find data position to pack a DATA packet for a retransmission. + /// IMPORTANT: @a packet is [in,out] because it is expected to get set + /// the sequence number of the packet expected to be sent next. The sender + /// buffer normally doesn't handle sequence numbers and the consistency + /// between the sequence number of a packet already sent and kept in the + /// buffer is achieved by having the sequence number recorded in the + /// CUDT::m_iSndLastDataAck field that should represent the oldest packet + /// still in the buffer. /// @param [in] offset offset from the last ACK point (backward sequence number difference) - /// @param [out] packet the packet to read. + /// @param [in,out] packet the packet to read. /// @param [out] origintime origin time stamp of the message /// @param [out] msglen length of the message - /// @return Actual length of data read (return 0 if offset too large, -1 if TTL exceeded). + /// @retval >0 Length of the data read. + /// @retval READ_NONE No data available or @a offset points out of the buffer occupied space. + /// @retval READ_DROP The call requested data drop due to TTL exceeded, to be handled first. SRT_ATTR_EXCLUDES(m_BufLock) - int readData(const int offset, CPacket& w_packet, time_point& w_origintime, int& w_msglen); + int readData(const int offset, CPacket& w_packet, time_point& w_origintime, Drop& w_drop); /// Get the time of the last retransmission (if any) of the DATA packet. /// @param [in] offset offset from the last ACK point (backward sequence number difference) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index d6b4d7389..1822e8213 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -9071,28 +9071,25 @@ int srt::CUDT::packLostData(CPacket& w_packet) } } - int msglen; + CSndBuffer::Drop buffer_drop; steady_clock::time_point tsOrigin; - const int payload = m_pSndBuffer->readData(offset, (w_packet), (tsOrigin), (msglen)); - if (payload == -1) + const int payload = m_pSndBuffer->readData(offset, (w_packet), (tsOrigin), (buffer_drop)); + if (payload == CSndBuffer::READ_DROP) { - int32_t seqpair[2]; - seqpair[0] = w_packet.m_iSeqNo; - SRT_ASSERT(msglen >= 1); - seqpair[1] = CSeqNo::incseq(seqpair[0], msglen - 1); + SRT_ASSERT(CSeqNo::seqoff(buffer_drop.seqno[CSndBuffer::Drop::BEGIN], buffer_drop.seqno[CSndBuffer::Drop::END]) >= 0); HLOGC(qrlog.Debug, - log << CONID() << "loss-reported packets expired in SndBuf - requesting DROP: msgno=" - << MSGNO_SEQ::unwrap(w_packet.m_iMsgNo) << " msglen=" << msglen << " SEQ:" << seqpair[0] << " - " - << seqpair[1]); - sendCtrl(UMSG_DROPREQ, &w_packet.m_iMsgNo, seqpair, sizeof(seqpair)); + log << CONID() << "loss-reported packets expired in SndBuf - requesting DROP: #" + << buffer_drop.msgno << " %(" << buffer_drop.seqno[CSndBuffer::Drop::BEGIN] << " - " + << buffer_drop.seqno[CSndBuffer::Drop::END] << ")"); + sendCtrl(UMSG_DROPREQ, &buffer_drop.msgno, buffer_drop.seqno, sizeof(buffer_drop.seqno)); // skip all dropped packets - m_pSndLossList->removeUpTo(seqpair[1]); - m_iSndCurrSeqNo = CSeqNo::maxseq(m_iSndCurrSeqNo, seqpair[1]); + m_pSndLossList->removeUpTo(buffer_drop.seqno[CSndBuffer::Drop::END]); + m_iSndCurrSeqNo = CSeqNo::maxseq(m_iSndCurrSeqNo, buffer_drop.seqno[CSndBuffer::Drop::END]); continue; } - else if (payload == 0) + else if (payload == CSndBuffer::READ_NONE) continue; // The packet has been ecrypted, thus the authentication tag is expected to be stored From 333eaf2eab8979c6321b9940a19cbef159898f63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Fri, 23 Dec 2022 13:35:40 +0100 Subject: [PATCH 2/3] [MAINT] Removed reference fields from CPacket pinned to the SRT header --- srtcore/buffer_snd.cpp | 14 +-- srtcore/channel.cpp | 4 +- srtcore/core.cpp | 223 ++++++++++++++++++----------------- srtcore/packet.cpp | 6 +- srtcore/packet.h | 11 +- srtcore/packetfilter.cpp | 2 +- srtcore/queue.cpp | 4 +- srtcore/utilities.h | 1 + srtcore/window.h | 8 +- test/test_buffer_rcv.cpp | 19 +-- test/test_fec_rebuilding.cpp | 6 +- 11 files changed, 155 insertions(+), 143 deletions(-) diff --git a/srtcore/buffer_snd.cpp b/srtcore/buffer_snd.cpp index d63f97519..2f29a2c6c 100644 --- a/srtcore/buffer_snd.cpp +++ b/srtcore/buffer_snd.cpp @@ -320,7 +320,7 @@ int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime, w_packet.m_pcData = m_pCurrBlock->m_pcData; readlen = m_pCurrBlock->m_iLength; w_packet.setLength(readlen, m_iBlockLen); - w_packet.m_iSeqNo = m_pCurrBlock->m_iSeqNo; + w_packet.set_seqno(m_pCurrBlock->m_iSeqNo); // 1. On submission (addBuffer), the KK flag is set to EK_NOENC (0). // 2. The readData() is called to get the original (unique) payload not ever sent yet. @@ -346,7 +346,7 @@ int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime, } Block* p = m_pCurrBlock; - w_packet.m_iMsgNo = m_pCurrBlock->m_iMsgNoBitset; + w_packet.set_msgflags(m_pCurrBlock->m_iMsgNoBitset); w_srctime = m_pCurrBlock->m_tsOriginTime; m_pCurrBlock = m_pCurrBlock->m_pNext; @@ -448,7 +448,7 @@ int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time // This is rexmit request, so the packet should have the sequence number // already set when it was once sent uniquely. - SRT_ASSERT(p->m_iSeqNo == w_packet.m_iSeqNo); + SRT_ASSERT(p->m_iSeqNo == w_packet.seqno()); // Check if the block that is the next candidate to send (m_pCurrBlock pointing) is stale. @@ -492,8 +492,8 @@ int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time // to simply take the sequence number from the block. But this is a new // feature and should be only used after refax for the sender buffer to // make it manage the sequence numbers inside, instead of by CUDT::m_iSndLastDataAck. - w_drop.seqno[Drop::BEGIN] = w_packet.m_iSeqNo; - w_drop.seqno[Drop::END] = CSeqNo::incseq(w_packet.m_iSeqNo, msglen - 1); + w_drop.seqno[Drop::BEGIN] = w_packet.seqno(); + w_drop.seqno[Drop::END] = CSeqNo::incseq(w_packet.seqno(), msglen - 1); // Note the rules: here `p` is pointing to the first block AFTER the // message to be dropped, so the end sequence should be one behind @@ -514,7 +514,7 @@ int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time // encrypted, and with all ENC flags already set. So, the first call to send // the packet originally (the other overload of this function) must set these // flags. - w_packet.m_iMsgNo = p->m_iMsgNoBitset; + w_packet.set_msgflags(p->m_iMsgNoBitset); w_srctime = p->m_tsOriginTime; // This function is called when packet retransmission is triggered. @@ -522,7 +522,7 @@ int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time p->m_tsRexmitTime = steady_clock::now(); HLOGC(qslog.Debug, - log << CONID() << "CSndBuffer: getting packet %" << p->m_iSeqNo << " as per %" << w_packet.m_iSeqNo + log << CONID() << "CSndBuffer: getting packet %" << p->m_iSeqNo << " as per %" << w_packet.seqno() << " size=" << readlen << " to send [REXMIT]"); return readlen; diff --git a/srtcore/channel.cpp b/srtcore/channel.cpp index 30fd98778..4f752f290 100644 --- a/srtcore/channel.cpp +++ b/srtcore/channel.cpp @@ -613,8 +613,8 @@ void srt::CChannel::getPeerAddr(sockaddr_any& w_addr) const int srt::CChannel::sendto(const sockaddr_any& addr, CPacket& packet) const { HLOGC(kslog.Debug, - log << "CChannel::sendto: SENDING NOW DST=" << addr.str() << " target=@" << packet.m_iID - << " size=" << packet.getLength() << " pkt.ts=" << packet.m_iTimeStamp << " " << packet.Info()); + log << "CChannel::sendto: SENDING NOW DST=" << addr.str() << " target=@" << packet.id() + << " size=" << packet.getLength() << " pkt.ts=" << packet.timestamp() << " " << packet.Info()); #ifdef SRT_TEST_FAKE_LOSS diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 1822e8213..ef3734518 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -1963,7 +1963,7 @@ bool srt::CUDT::processSrtMsg(const CPacket *ctrlpkt) uint32_t *srtdata = (uint32_t *)ctrlpkt->m_pcData; size_t len = ctrlpkt->getLength(); int etype = ctrlpkt->getExtendedType(); - uint32_t ts = ctrlpkt->m_iTimeStamp; + uint32_t ts = ctrlpkt->timestamp(); int res = SRT_CMD_NONE; @@ -2494,7 +2494,7 @@ bool srt::CUDT::interpretSrtHandshake(const CHandShake& hs, return false; // don't interpret } - int rescmd = processSrtMsg_HSREQ(begin + 1, bytelen, hspkt.m_iTimeStamp, HS_VERSION_SRT1); + int rescmd = processSrtMsg_HSREQ(begin + 1, bytelen, hspkt.timestamp(), HS_VERSION_SRT1); // Interpreted? Then it should be responded with SRT_CMD_HSRSP. if (rescmd != SRT_CMD_HSRSP) { @@ -2521,7 +2521,7 @@ bool srt::CUDT::interpretSrtHandshake(const CHandShake& hs, return false; // don't interpret } - int rescmd = processSrtMsg_HSRSP(begin + 1, bytelen, hspkt.m_iTimeStamp, HS_VERSION_SRT1); + int rescmd = processSrtMsg_HSRSP(begin + 1, bytelen, hspkt.timestamp(), HS_VERSION_SRT1); // Interpreted? Then it should be responded with SRT_CMD_NONE. // (nothing to be responded for HSRSP, unless there was some kinda problem) if (rescmd != SRT_CMD_NONE) @@ -3525,7 +3525,7 @@ void srt::CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn) // control of methods.) // ID = 0, connection request - reqpkt.m_iID = 0; + reqpkt.set_id(0); size_t hs_size = m_iMaxSRTPayloadSize; m_ConnReq.store_to((reqpkt.m_pcData), (hs_size)); @@ -3540,7 +3540,7 @@ void srt::CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn) setPacketTS(reqpkt, tnow); HLOGC(cnlog.Debug, - log << CONID() << "CUDT::startConnect: REQ-TIME set HIGH (TimeStamp: " << reqpkt.m_iTimeStamp + log << CONID() << "CUDT::startConnect: REQ-TIME set HIGH (TimeStamp: " << reqpkt.timestamp() << "). SENDING HS: " << m_ConnReq.show()); /* @@ -3605,7 +3605,7 @@ void srt::CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn) << " > 250 ms). size=" << reqpkt.getLength()); if (m_config.bRendezvous) - reqpkt.m_iID = m_ConnRes.m_iID; + reqpkt.set_id(m_ConnRes.m_iID); #if ENABLE_HEAVY_LOGGING { @@ -3823,17 +3823,17 @@ bool srt::CUDT::processAsyncConnectRequest(EReadStatus rst, // This should have got the original value returned from // processConnectResponse through processAsyncConnectResponse. - CPacket request; - request.setControl(UMSG_HANDSHAKE); - request.allocate(m_iMaxSRTPayloadSize); + CPacket reqpkt; + reqpkt.setControl(UMSG_HANDSHAKE); + reqpkt.allocate(m_iMaxSRTPayloadSize); const steady_clock::time_point now = steady_clock::now(); - setPacketTS(request, now); + setPacketTS(reqpkt, now); HLOGC(cnlog.Debug, log << CONID() << "processAsyncConnectRequest: REQ-TIME: HIGH. Should prevent too quick responses."); m_tsLastReqTime = now; // ID = 0, connection request - request.m_iID = !m_config.bRendezvous ? 0 : m_ConnRes.m_iID; + reqpkt.set_id(!m_config.bRendezvous ? 0 : m_ConnRes.m_iID); bool status = true; @@ -3844,7 +3844,7 @@ bool srt::CUDT::processAsyncConnectRequest(EReadStatus rst, if (cst == CONN_RENDEZVOUS) { HLOGC(cnlog.Debug, log << CONID() << "processAsyncConnectRequest: passing to processRendezvous"); - cst = processRendezvous(pResponse, serv_addr, rst, (request)); + cst = processRendezvous(pResponse, serv_addr, rst, (reqpkt)); if (cst == CONN_ACCEPT) { HLOGC(cnlog.Debug, @@ -3876,8 +3876,8 @@ bool srt::CUDT::processAsyncConnectRequest(EReadStatus rst, { // (this procedure will be also run for HSv4 rendezvous) HLOGC(cnlog.Debug, - log << CONID() << "processAsyncConnectRequest: serializing HS: buffer size=" << request.getLength()); - if (!createSrtHandshake(SRT_CMD_HSREQ, SRT_CMD_KMREQ, 0, 0, (request), (m_ConnReq))) + log << CONID() << "processAsyncConnectRequest: serializing HS: buffer size=" << reqpkt.getLength()); + if (!createSrtHandshake(SRT_CMD_HSREQ, SRT_CMD_KMREQ, 0, 0, (reqpkt), (m_ConnReq))) { // All 'false' returns from here are IPE-type, mostly "invalid argument" plus "all keys expired". LOGC(cnlog.Error, @@ -3889,7 +3889,7 @@ bool srt::CUDT::processAsyncConnectRequest(EReadStatus rst, HLOGC(cnlog.Debug, log << CONID() << "processAsyncConnectRequest: sending HS reqtype=" << RequestTypeStr(m_ConnReq.m_iReqType) - << " to socket " << request.m_iID << " size=" << request.getLength()); + << " to socket " << reqpkt.id() << " size=" << reqpkt.getLength()); } } @@ -3899,16 +3899,16 @@ bool srt::CUDT::processAsyncConnectRequest(EReadStatus rst, /* XXX Shouldn't it send a single response packet for the rejection? // Set the version to 0 as "handshake rejection" status and serialize it CHandShake zhs; - size_t size = request.getLength(); - zhs.store_to((request.m_pcData), (size)); - request.setLength(size); + size_t size = reqpkt.getLength(); + zhs.store_to((reqpkt.m_pcData), (size)); + reqpkt.setLength(size); */ } HLOGC(cnlog.Debug, log << CONID() << "processAsyncConnectRequest: setting REQ-TIME HIGH, SENDING HS:" << m_ConnReq.show()); m_tsLastReqTime = steady_clock::now(); - m_pSndQueue->sendto(serv_addr, request); + m_pSndQueue->sendto(serv_addr, reqpkt); return status; } @@ -5679,15 +5679,15 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& size_t size = m_iMaxSRTPayloadSize; // Allocate the maximum possible memory for an SRT payload. // This is a maximum you can send once. - CPacket response; - response.setControl(UMSG_HANDSHAKE); - response.allocate(size); + CPacket rsppkt; + rsppkt.setControl(UMSG_HANDSHAKE); + rsppkt.allocate(size); // This will serialize the handshake according to its current form. HLOGC(cnlog.Debug, log << CONID() << "acceptAndRespond: creating CONCLUSION response (HSv5: with HSRSP/KMRSP) buffer size=" << size); - if (!createSrtHandshake(SRT_CMD_HSRSP, SRT_CMD_KMRSP, kmdata, kmdatasize, (response), (w_hs))) + if (!createSrtHandshake(SRT_CMD_HSRSP, SRT_CMD_KMRSP, kmdata, kmdatasize, (rsppkt), (w_hs))) { LOGC(cnlog.Error, log << CONID() << "acceptAndRespond: error creating handshake response"); throw CUDTException(MJ_SETUP, MN_REJECTED, 0); @@ -5698,10 +5698,10 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& // To make sure what REALLY is being sent, parse back the handshake // data that have been just written into the buffer. CHandShake debughs; - debughs.load_from(response.m_pcData, response.getLength()); + debughs.load_from(rsppkt.m_pcData, rsppkt.getLength()); HLOGC(cnlog.Debug, log << CONID() << "acceptAndRespond: sending HS from agent @" - << debughs.m_iID << " to peer @" << response.m_iID + << debughs.m_iID << " to peer @" << rsppkt.id() << "HS:" << debughs.show()); } #endif @@ -5711,7 +5711,7 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& // When missed this message, the caller should not accept packets // coming as connected, but continue repeated handshake until finally // received the listener's handshake. - addressAndSend((response)); + addressAndSend((rsppkt)); } // This function is required to be called when a caller receives an INDUCTION @@ -5917,7 +5917,7 @@ void srt::CUDT::checkSndKMRefresh() void srt::CUDT::addressAndSend(CPacket& w_pkt) { - w_pkt.m_iID = m_PeerID; + w_pkt.set_id(m_PeerID); setPacketTS(w_pkt, steady_clock::now()); // NOTE: w_pkt isn't modified in this call, @@ -7625,8 +7625,8 @@ void srt::CUDT::sendCtrl(UDTMessageType pkttype, const int32_t* lparam, void* rp case UMSG_ACKACK: // 110 - Acknowledgement of Acknowledgement ctrlpkt.pack(pkttype, lparam); - ctrlpkt.m_iID = m_PeerID; - nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt); + ctrlpkt.set_id(m_PeerID); + nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt); break; @@ -7640,7 +7640,7 @@ void srt::CUDT::sendCtrl(UDTMessageType pkttype, const int32_t* lparam, void* rp size_t bytes = sizeof(*lossdata) * size; ctrlpkt.pack(pkttype, NULL, lossdata, bytes); - ctrlpkt.m_iID = m_PeerID; + ctrlpkt.set_id(m_PeerID); nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt); enterCS(m_StatsLock); @@ -7661,7 +7661,7 @@ void srt::CUDT::sendCtrl(UDTMessageType pkttype, const int32_t* lparam, void* rp if (0 < losslen) { ctrlpkt.pack(pkttype, NULL, data, losslen * 4); - ctrlpkt.m_iID = m_PeerID; + ctrlpkt.set_id(m_PeerID); nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt); enterCS(m_StatsLock); @@ -7691,7 +7691,7 @@ void srt::CUDT::sendCtrl(UDTMessageType pkttype, const int32_t* lparam, void* rp case UMSG_CGWARNING: // 100 - Congestion Warning ctrlpkt.pack(pkttype); - ctrlpkt.m_iID = m_PeerID; + ctrlpkt.set_id(m_PeerID); nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt); m_tsLastWarningTime = steady_clock::now(); @@ -7700,14 +7700,14 @@ void srt::CUDT::sendCtrl(UDTMessageType pkttype, const int32_t* lparam, void* rp case UMSG_KEEPALIVE: // 001 - Keep-alive ctrlpkt.pack(pkttype); - ctrlpkt.m_iID = m_PeerID; + ctrlpkt.set_id(m_PeerID); nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt); break; case UMSG_HANDSHAKE: // 000 - Handshake ctrlpkt.pack(pkttype, NULL, rparam, sizeof(CHandShake)); - ctrlpkt.m_iID = m_PeerID; + ctrlpkt.set_id(m_PeerID); nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt); break; @@ -7716,21 +7716,21 @@ void srt::CUDT::sendCtrl(UDTMessageType pkttype, const int32_t* lparam, void* rp if (m_PeerID == 0) // Dont't send SHUTDOWN if we don't know peer ID. break; ctrlpkt.pack(pkttype); - ctrlpkt.m_iID = m_PeerID; + ctrlpkt.set_id(m_PeerID); nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt); break; case UMSG_DROPREQ: // 111 - Msg drop request ctrlpkt.pack(pkttype, lparam, rparam, 8); - ctrlpkt.m_iID = m_PeerID; + ctrlpkt.set_id(m_PeerID); nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt); break; case UMSG_PEERERROR: // 1000 - acknowledge the peer side a special error ctrlpkt.pack(pkttype, lparam); - ctrlpkt.m_iID = m_PeerID; + ctrlpkt.set_id(m_PeerID); nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt); break; @@ -7817,7 +7817,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) { bufflock.unlock(); ctrlpkt.pack(UMSG_ACK, NULL, &ack, size); - ctrlpkt.m_iID = m_PeerID; + ctrlpkt.set_id(m_PeerID); nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt); DebugAck(CONID() + "sendCtrl(lite): ", local_prevack, ack); return nbsent; @@ -8024,7 +8024,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) ctrlpkt.pack(UMSG_ACK, &m_iAckSeqNo, data, ACKD_FIELD_SIZE * ACKD_TOTAL_SIZE_SMALL); } - ctrlpkt.m_iID = m_PeerID; + ctrlpkt.set_id(m_PeerID); setPacketTS(ctrlpkt, steady_clock::now()); nbsent = m_pSndQueue->sendto(m_PeerAddr, ctrlpkt); DebugAck(CONID() + "sendCtrl(UMSG_ACK): ", local_prevack, ack); @@ -8716,18 +8716,18 @@ void srt::CUDT::processCtrlHS(const CPacket& ctrlpkt) log << CONID() << "processCtrl: responding HS reqtype=" << RequestTypeStr(initdata.m_iReqType) << (have_hsreq ? " WITH SRT HS response extensions" : "")); - CPacket response; - response.setControl(UMSG_HANDSHAKE); - response.allocate(m_iMaxSRTPayloadSize); + CPacket rsppkt; + rsppkt.setControl(UMSG_HANDSHAKE); + rsppkt.allocate(m_iMaxSRTPayloadSize); // If createSrtHandshake failed, don't send anything. Actually it can only fail on IPE. // There is also no possible IPE condition in case of HSv4 - for this version it will always return true. if (createSrtHandshake(SRT_CMD_HSRSP, SRT_CMD_KMRSP, kmdata, kmdatasize, - (response), (initdata))) + (rsppkt), (initdata))) { - response.m_iID = m_PeerID; - setPacketTS(response, steady_clock::now()); - const int nbsent = m_pSndQueue->sendto(m_PeerAddr, response); + rsppkt.set_id(m_PeerID); + setPacketTS(rsppkt, steady_clock::now()); + const int nbsent = m_pSndQueue->sendto(m_PeerAddr, rsppkt); if (nbsent) { m_tsLastSndTime.store(steady_clock::now()); @@ -8849,7 +8849,7 @@ void srt::CUDT::processCtrl(const CPacket &ctrlpkt) HLOGC(inlog.Debug, log << CONID() << "incoming UMSG:" << ctrlpkt.getType() << " (" - << MessageTypeStr(ctrlpkt.getType(), ctrlpkt.getExtendedType()) << ") socket=%" << ctrlpkt.m_iID); + << MessageTypeStr(ctrlpkt.getType(), ctrlpkt.getExtendedType()) << ") socket=%" << ctrlpkt.id()); switch (ctrlpkt.getType()) { @@ -9024,37 +9024,44 @@ int srt::CUDT::packLostData(CPacket& w_packet) const steady_clock::time_point time_now = steady_clock::now(); const steady_clock::time_point time_nak = time_now - microseconds_from(m_iSRTT - 4 * m_iRTTVar); - while ((w_packet.m_iSeqNo = m_pSndLossList->popLostSeq()) >= 0) + for (;;) { + w_packet.set_seqno(m_pSndLossList->popLostSeq()); + if (w_packet.seqno() == SRT_SEQNO_NONE) + break; + // XXX See the note above the m_iSndLastDataAck declaration in core.h // This is the place where the important sequence numbers for // sender buffer are actually managed by this field here. - const int offset = CSeqNo::seqoff(m_iSndLastDataAck, w_packet.m_iSeqNo); + const int offset = CSeqNo::seqoff(m_iSndLastDataAck, w_packet.seqno()); if (offset < 0) { // XXX Likely that this will never be executed because if the upper // sequence is not in the sender buffer, then most likely the loss // was completely ignored. LOGC(qrlog.Error, - log << CONID() << "IPE/EPE: packLostData: LOST packet negative offset: seqoff(m_iSeqNo " - << w_packet.m_iSeqNo << ", m_iSndLastDataAck " << m_iSndLastDataAck << ")=" << offset - << ". Continue"); + log << CONID() << "IPE/EPE: packLostData: LOST packet negative offset: seqoff(seqno() " + << w_packet.seqno() << ", m_iSndLastDataAck " << m_iSndLastDataAck << ")=" << offset + << ". Continue, request DROP"); // No matter whether this is right or not (maybe the attack case should be // considered, and some LOSSREPORT flood prevention), send the drop request // to the peer. int32_t seqpair[2] = { - w_packet.m_iSeqNo, + w_packet.seqno(), CSeqNo::decseq(m_iSndLastDataAck) }; - w_packet.m_iMsgNo = 0; // Message number is not known, setting all 32 bits to 0. HLOGC(qrlog.Debug, - log << CONID() << "PEER reported LOSS not from the sending buffer - requesting DROP: msg=" - << MSGNO_SEQ::unwrap(w_packet.m_iMsgNo) << " SEQ:" << seqpair[0] << " - " << seqpair[1] << "(" + log << CONID() << "PEER reported LOSS not from the sending buffer - requesting DROP: #" + << MSGNO_SEQ::unwrap(w_packet.msgflags()) << " SEQ:" << seqpair[0] << " - " << seqpair[1] << "(" << (-offset) << " packets)"); - sendCtrl(UMSG_DROPREQ, &w_packet.m_iMsgNo, seqpair, sizeof(seqpair)); + // See interpretation in processCtrlDropReq(). We don't know the message number, + // so we request that the drop be exclusively sequence number based. + int32_t msgno = SRT_MSGNO_CONTROL; + + sendCtrl(UMSG_DROPREQ, &msgno, seqpair, sizeof(seqpair)); continue; } @@ -9064,7 +9071,7 @@ int srt::CUDT::packLostData(CPacket& w_packet) if (tsLastRexmit >= time_nak) { HLOGC(qrlog.Debug, log << CONID() << "REXMIT: ignoring seqno " - << w_packet.m_iSeqNo << ", last rexmit " << (is_zero(tsLastRexmit) ? "never" : FormatTime(tsLastRexmit)) + << w_packet.seqno() << ", last rexmit " << (is_zero(tsLastRexmit) ? "never" : FormatTime(tsLastRexmit)) << " RTT=" << m_iSRTT << " RTTVar=" << m_iRTTVar << " now=" << FormatTime(time_now)); continue; @@ -9114,7 +9121,7 @@ int srt::CUDT::packLostData(CPacket& w_packet) // So, set here the rexmit flag if the peer understands it. if (m_bPeerRexmitFlag) { - w_packet.m_iMsgNo |= PACKET_SND_REXMIT; + w_packet.set_msgflags(w_packet.msgflags() | PACKET_SND_REXMIT); } setDataPacketTS(w_packet, tsOrigin); @@ -9215,7 +9222,7 @@ void srt::CUDT::setPacketTS(CPacket& p, const time_point& ts) enterCS(m_StatsLock); const time_point tsStart = m_stats.tsStartTime; leaveCS(m_StatsLock); - p.m_iTimeStamp = makeTS(ts, tsStart); + p.set_timestamp(makeTS(ts, tsStart)); } void srt::CUDT::setDataPacketTS(CPacket& p, const time_point& ts) @@ -9227,14 +9234,14 @@ void srt::CUDT::setDataPacketTS(CPacket& p, const time_point& ts) if (!m_bPeerTsbPd) { // If TSBPD is disabled, use the current time as the source (timestamp using the sending time). - p.m_iTimeStamp = makeTS(steady_clock::now(), tsStart); + p.set_timestamp(makeTS(steady_clock::now(), tsStart)); return; } // TODO: Might be better for performance to ensure this condition is always false, and just use SRT_ASSERT here. if (ts < tsStart) { - p.m_iTimeStamp = makeTS(steady_clock::now(), tsStart); + p.set_timestamp(makeTS(steady_clock::now(), tsStart)); LOGC(qslog.Warn, log << CONID() << "setPacketTS: reference time=" << FormatTime(ts) << " is in the past towards start time=" << FormatTime(tsStart) @@ -9243,7 +9250,7 @@ void srt::CUDT::setDataPacketTS(CPacket& p, const time_point& ts) } // Use the provided source time for the timestamp. - p.m_iTimeStamp = makeTS(ts, tsStart); + p.set_timestamp(makeTS(ts, tsStart)); } bool srt::CUDT::isRetransmissionAllowed(const time_point& tnow SRT_ATR_UNUSED) @@ -9340,14 +9347,14 @@ std::pair srt::CUDT::packData(CPacket& w_packet) new_packet_packed = true; // every 16 (0xF) packets, a packet pair is sent - if ((w_packet.m_iSeqNo & PUMASK_SEQNO_PROBE) == 0) + if ((w_packet.seqno() & PUMASK_SEQNO_PROBE) == 0) probe = true; payload = (int) w_packet.getLength(); IF_HEAVY_LOGGING(reason = "normal"); } - w_packet.m_iID = m_PeerID; // Set the destination SRT socket ID. + w_packet.set_id(m_PeerID); // Set the destination SRT socket ID. if (new_packet_packed && m_PacketFilter) { @@ -9357,7 +9364,7 @@ std::pair srt::CUDT::packData(CPacket& w_packet) #if ENABLE_HEAVY_LOGGING // Required because of referring to MessageFlagStr() HLOGC(qslog.Debug, - log << CONID() << "packData: " << reason << " packet seq=" << w_packet.m_iSeqNo << " (ACK=" << m_iSndLastAck + log << CONID() << "packData: " << reason << " packet seq=" << w_packet.seqno() << " (ACK=" << m_iSndLastAck << " ACKDATA=" << m_iSndLastDataAck << " MSG/FLAGS: " << w_packet.MessageFlagStr() << ")"); #endif @@ -9376,7 +9383,7 @@ std::pair srt::CUDT::packData(CPacket& w_packet) // Left untouched for historical reasons. // Might be possible that it was because of that this is send from // different thread than the rest of the signals. - // m_pSndTimeWindow->onPktSent(w_packet.m_iTimeStamp); + // m_pSndTimeWindow->onPktSent(w_packet.timestamp()); enterCS(m_StatsLock); m_stats.sndr.sent.count(payload); @@ -9462,7 +9469,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) // Fortunately the group itself isn't being accessed. if (m_parent->m_GroupOf) { - const int packetspan = CSeqNo::seqoff(m_iSndCurrSeqNo, w_packet.m_iSeqNo); + const int packetspan = CSeqNo::seqoff(m_iSndCurrSeqNo, w_packet.seqno()); if (packetspan > 0) { // After increasing by 1, but being previously set as ISN-1, this should be == ISN, @@ -9476,7 +9483,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) // initialized from ISN just after connection. LOGC(qslog.Note, log << CONID() << "packData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo - << " from SCHEDULING sequence " << w_packet.m_iSeqNo << " for the first packet: DIFF=" + << " from SCHEDULING sequence " << w_packet.seqno() << " for the first packet: DIFF=" << packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); } else @@ -9484,7 +9491,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) // There will be a serious data discrepancy between the agent and the peer. LOGC(qslog.Error, log << CONID() << "IPE: packData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo - << " from SCHEDULING sequence " << w_packet.m_iSeqNo << " in the middle of transition: DIFF=" + << " from SCHEDULING sequence " << w_packet.seqno() << " in the middle of transition: DIFF=" << packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); } @@ -9493,7 +9500,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) // Don't do it if the difference isn't positive or exceeds the threshold. int32_t seqpair[2]; seqpair[0] = m_iSndCurrSeqNo; - seqpair[1] = CSeqNo::decseq(w_packet.m_iSeqNo); + seqpair[1] = CSeqNo::decseq(w_packet.seqno()); const int32_t no_msgno = 0; LOGC(qslog.Debug, log << CONID() << "packData: Sending DROPREQ: SEQ: " << seqpair[0] << " - " << seqpair[1] << " (" @@ -9504,17 +9511,17 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) // packet are not present in the buffer (preadte the send buffer). // Override extraction sequence with scheduling sequence. - m_iSndCurrSeqNo = w_packet.m_iSeqNo; + m_iSndCurrSeqNo = w_packet.seqno(); ScopedLock ackguard(m_RecvAckLock); - m_iSndLastAck = w_packet.m_iSeqNo; - m_iSndLastDataAck = w_packet.m_iSeqNo; - m_iSndLastFullAck = w_packet.m_iSeqNo; - m_iSndLastAck2 = w_packet.m_iSeqNo; + m_iSndLastAck = w_packet.seqno(); + m_iSndLastDataAck = w_packet.seqno(); + m_iSndLastFullAck = w_packet.seqno(); + m_iSndLastAck2 = w_packet.seqno(); } else if (packetspan < 0) { LOGC(qslog.Error, - log << CONID() << "IPE: packData: SCHEDULING sequence " << w_packet.m_iSeqNo + log << CONID() << "IPE: packData: SCHEDULING sequence " << w_packet.seqno() << " is behind of EXTRACTION sequence " << m_iSndCurrSeqNo << ", dropping this packet: DIFF=" << packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); // XXX: Probably also change the socket state to broken? @@ -9526,15 +9533,15 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) { HLOGC(qslog.Debug, log << CONID() << "packData: Applying EXTRACTION sequence " << m_iSndCurrSeqNo - << " over SCHEDULING sequence " << w_packet.m_iSeqNo << " for socket not in group:" - << " DIFF=" << CSeqNo::seqcmp(m_iSndCurrSeqNo, w_packet.m_iSeqNo) + << " over SCHEDULING sequence " << w_packet.seqno() << " for socket not in group:" + << " DIFF=" << CSeqNo::seqcmp(m_iSndCurrSeqNo, w_packet.seqno()) << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); // Do this always when not in a group. - w_packet.m_iSeqNo = m_iSndCurrSeqNo; + w_packet.set_seqno(m_iSndCurrSeqNo); } // Set missing fields before encrypting the packet, because those fields might be used for encryption. - w_packet.m_iID = m_PeerID; // Destination SRT Socket ID + w_packet.set_id(m_PeerID); // Destination SRT Socket ID setDataPacketTS(w_packet, tsOrigin); if (kflg != EK_NOENC) @@ -9554,7 +9561,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet) } #if SRT_DEBUG_TRACE_SND - g_snd_logger.state.iPktSeqno = w_packet.m_iSeqNo; + g_snd_logger.state.iPktSeqno = w_packet.seqno(); g_snd_logger.state.isRetransmitted = w_packet.getRexmitFlag(); g_snd_logger.trace(); #endif @@ -9725,7 +9732,7 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& bool adding_successful = true; - const int32_t bufidx = CSeqNo::seqoff(bufseq, rpkt.m_iSeqNo); + const int32_t bufidx = CSeqNo::seqoff(bufseq, rpkt.seqno()); IF_HEAVY_LOGGING(const char *exc_type = "EXPECTED"); @@ -9750,7 +9757,7 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& // which never contains losses, so discarding this packet does not // discard a loss coverage, even if this were past ACK. - if (bufidx < 0 || CSeqNo::seqcmp(rpkt.m_iSeqNo, m_iRcvLastAck) < 0) + if (bufidx < 0 || CSeqNo::seqcmp(rpkt.seqno(), m_iRcvLastAck) < 0) { time_point pts = getPktTsbPdTime(NULL, rpkt); @@ -9763,7 +9770,7 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& m_stats.rcvr.recvdBelated.count(rpkt.getLength()); leaveCS(m_StatsLock); HLOGC(qrlog.Debug, - log << CONID() << "RECEIVED: %" << rpkt.m_iSeqNo << " bufidx=" << bufidx << " (BELATED/" + log << CONID() << "RECEIVED: %" << rpkt.seqno() << " bufidx=" << bufidx << " (BELATED/" << s_rexmitstat_str[pktrexmitflag] << ") with ACK %" << m_iRcvLastAck << " FLAGS: " << rpkt.MessageFlagStr()); continue; @@ -9785,7 +9792,7 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& LOGC(qrlog.Error, log << CONID() << "SEQUENCE DISCREPANCY. BREAKING CONNECTION." - " %" << rpkt.m_iSeqNo + " %" << rpkt.seqno() << " buffer=(%" << bufseq << ":%" << m_iRcvCurrSeqNo // -1 = size to last index << "+%" << CSeqNo::incseq(bufseq, int(m_pRcvBuffer->capacity()) - 1) @@ -9796,7 +9803,7 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& } else { - LOGC(qrlog.Warn, log << CONID() << "No room to store incoming packet seqno " << rpkt.m_iSeqNo + LOGC(qrlog.Warn, log << CONID() << "No room to store incoming packet seqno " << rpkt.seqno() << ", insert offset " << bufidx << ". " << m_pRcvBuffer->strFullnessState(m_iRcvLastAck, steady_clock::now()) ); @@ -9887,7 +9894,7 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& // Empty buffer info in case of groupwise receiver. // There's no way to obtain this information here. - LOGC(qrlog.Debug, log << CONID() << "RECEIVED: %" << rpkt.m_iSeqNo + LOGC(qrlog.Debug, log << CONID() << "RECEIVED: %" << rpkt.seqno() << bufinfo.str() << " RSL=" << expectspec.str() << " SN=" << s_rexmitstat_str[pktrexmitflag] @@ -9901,12 +9908,12 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& { HLOGC(qrlog.Debug, log << CONID() - << "CONTIGUITY CHECK: sequence distance: " << CSeqNo::seqoff(m_iRcvCurrSeqNo, rpkt.m_iSeqNo)); + << "CONTIGUITY CHECK: sequence distance: " << CSeqNo::seqoff(m_iRcvCurrSeqNo, rpkt.seqno())); - if (CSeqNo::seqcmp(rpkt.m_iSeqNo, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0) // Loss detection. + if (CSeqNo::seqcmp(rpkt.seqno(), CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0) // Loss detection. { int32_t seqlo = CSeqNo::incseq(m_iRcvCurrSeqNo); - int32_t seqhi = CSeqNo::decseq(rpkt.m_iSeqNo); + int32_t seqhi = CSeqNo::decseq(rpkt.seqno()); w_srt_loss_seqs.push_back(make_pair(seqlo, seqhi)); HLOGC(qrlog.Debug, log << "pkt/LOSS DETECTED: %" << seqlo << " - %" << seqhi); } @@ -9914,9 +9921,9 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& // Update the current largest sequence number that has been received. // Or it is a retransmitted packet, remove it from receiver loss list. - if (CSeqNo::seqcmp(rpkt.m_iSeqNo, m_iRcvCurrSeqNo) > 0) + if (CSeqNo::seqcmp(rpkt.seqno(), m_iRcvCurrSeqNo) > 0) { - m_iRcvCurrSeqNo = rpkt.m_iSeqNo; // Latest possible received + m_iRcvCurrSeqNo = rpkt.seqno(); // Latest possible received } else { @@ -9964,7 +9971,7 @@ int srt::CUDT::processData(CUnit* in_unit) // Search the sequence in the loss record. rexmit_reason = " by "; ScopedLock lock(m_RcvLossLock); - if (!m_pRcvLossList->find(packet.m_iSeqNo, packet.m_iSeqNo)) + if (!m_pRcvLossList->find(packet.seqno(), packet.seqno())) rexmit_reason += "BLIND"; else rexmit_reason += "NAKREPORT"; @@ -10008,7 +10015,7 @@ int srt::CUDT::processData(CUnit* in_unit) // Conditions and any extra data required for the packet // this function will extract and test as needed. - const bool unordered = CSeqNo::seqcmp(packet.m_iSeqNo, m_iRcvCurrSeqNo) <= 0; + const bool unordered = CSeqNo::seqcmp(packet.seqno(), m_iRcvCurrSeqNo) <= 0; // Retransmitted and unordered packets do not provide expected measurement. // We expect the 16th and 17th packet to be sent regularly, @@ -10034,7 +10041,7 @@ int srt::CUDT::processData(CUnit* in_unit) // supply the missing packet(s), and the loss will no longer be visible for the code that follows. if (packet.getMsgSeq(m_bPeerRexmitFlag) != SRT_MSGNO_CONTROL) // disregard filter-control packets, their seq may mean nothing { - const int diff = CSeqNo::seqoff(m_iRcvCurrPhySeqNo, packet.m_iSeqNo); + const int diff = CSeqNo::seqoff(m_iRcvCurrPhySeqNo, packet.seqno()); // Difference between these two sequence numbers is expected to be: // 0 - duplicated last packet (theory only) // 1 - subsequent packet (alright) @@ -10050,13 +10057,13 @@ int srt::CUDT::processData(CUnit* in_unit) HLOGC(qrlog.Debug, log << CONID() << "LOSS STATS: n=" << loss << " SEQ: [" << CSeqNo::incseq(m_iRcvCurrPhySeqNo) << " " - << CSeqNo::decseq(packet.m_iSeqNo) << "]"); + << CSeqNo::decseq(packet.seqno()) << "]"); } if (diff > 0) { // Record if it was further than latest - m_iRcvCurrPhySeqNo = packet.m_iSeqNo; + m_iRcvCurrPhySeqNo = packet.seqno(); } } @@ -10122,7 +10129,7 @@ int srt::CUDT::processData(CUnit* in_unit) // offset from RcvLastAck in RcvBuffer must remain valid between seqoff() and addData() UniqueLock recvbuf_acklock(m_RcvBufferLock); // Needed for possibly check for needsQuickACK. - const bool incoming_belated = (CSeqNo::seqcmp(in_unit->m_Packet.m_iSeqNo, m_pRcvBuffer->getStartSeqNo()) < 0); + const bool incoming_belated = (CSeqNo::seqcmp(in_unit->m_Packet.seqno(), m_pRcvBuffer->getStartSeqNo()) < 0); const int res = handleSocketPacketReception(incoming, (new_inserted), @@ -10407,7 +10414,7 @@ void srt::CUDT::updateIdleLinkFrom(CUDT* source) void srt::CUDT::unlose(const CPacket &packet) { ScopedLock lg(m_RcvLossLock); - int32_t sequence = packet.m_iSeqNo; + int32_t sequence = packet.seqno(); m_pRcvLossList->remove(sequence); // Rest of this code concerns only the "belated lossreport" feature. @@ -10427,7 +10434,7 @@ void srt::CUDT::unlose(const CPacket &packet) { HLOGC(qrlog.Debug, log << "received out-of-band packet %" << sequence); - const int seqdiff = abs(CSeqNo::seqcmp(m_iRcvCurrSeqNo, packet.m_iSeqNo)); + const int seqdiff = abs(CSeqNo::seqcmp(m_iRcvCurrSeqNo, packet.seqno())); enterCS(m_StatsLock); m_stats.traceReorderDistance = max(seqdiff, m_stats.traceReorderDistance); leaveCS(m_StatsLock); @@ -10642,7 +10649,7 @@ int32_t srt::CUDT::bake(const sockaddr_any& addr, int32_t current_cookie, int co int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet) { // XXX ASSUMPTIONS: - // [[using assert(packet.m_iID == 0)]] + // [[using assert(packet.id() == 0)]] HLOGC(cnlog.Debug, log << CONID() << "processConnectRequest: received a connection request"); @@ -10723,7 +10730,7 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet) // array need not be aligned to int32_t - changed to union in a hope that using int32_t // inside a union will enforce whole union to be aligned to int32_t. hs.m_iCookie = cookie_val; - packet.m_iID = hs.m_iID; + packet.set_id(hs.m_iID); // Ok, now's the time. The listener sets here the version 5 handshake, // even though the request was 4. This is because the old client would @@ -10791,7 +10798,7 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet) HLOGC(cnlog.Debug, log << CONID() << "processConnectRequest: ... correct (ORIGINAL) cookie. Proceeding."); } - int32_t id = hs.m_iID; + SRTSOCKET id = hs.m_iID; // HANDSHAKE: The old client sees the version that does not match HS_VERSION_UDT4 (5). // In this case it will respond with URQ_ERROR_REJECT. Rest of the data are the same @@ -10839,8 +10846,8 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet) hs.m_iReqType = URQFailure(m_RejectReason); size_t size = CHandShake::m_iContentSize; hs.store_to((packet.m_pcData), (size)); - packet.m_iID = id; - setPacketTS(packet, steady_clock::now()); + packet.set_id(id); + setPacketTS((packet), steady_clock::now()); HLOGC(cnlog.Debug, log << CONID() << "processConnectRequest: SENDING HS (e): " << hs.show()); m_pSndQueue->sendto(addr, packet); } @@ -10951,7 +10958,7 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet) CPacket rsp; setPacketTS((rsp), steady_clock::now()); rsp.pack(UMSG_SHUTDOWN); - rsp.m_iID = m_PeerID; + rsp.set_id(m_PeerID); m_pSndQueue->sendto(addr, rsp); } else @@ -10962,7 +10969,7 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet) size_t size = CHandShake::m_iContentSize; hs.store_to((packet.m_pcData), (size)); packet.setLength(size); - packet.m_iID = id; + packet.set_id(id); setPacketTS(packet, steady_clock::now()); HLOGC(cnlog.Debug, log << CONID() << "processConnectRequest: SENDING HS (a): " << hs.show()); m_pSndQueue->sendto(addr, packet); diff --git a/srtcore/packet.cpp b/srtcore/packet.cpp index bf5db8c0d..a855552cd 100644 --- a/srtcore/packet.cpp +++ b/srtcore/packet.cpp @@ -179,10 +179,6 @@ CPacket::CPacket() : m_nHeader() // Silences GCC 12 warning "used uninitialized". , m_extra_pad() , m_data_owned(false) - , m_iSeqNo((int32_t&)(m_nHeader[SRT_PH_SEQNO])) - , m_iMsgNo((int32_t&)(m_nHeader[SRT_PH_MSGNO])) - , m_iTimeStamp((int32_t&)(m_nHeader[SRT_PH_TIMESTAMP])) - , m_iID((int32_t&)(m_nHeader[SRT_PH_ID])) , m_pcData((char*&)(m_PacketVector[PV_DATA].dataRef())) { m_nHeader.clear(); @@ -600,7 +596,7 @@ inline void SprintSpecialWord(std::ostream& os, int32_t val) std::string CPacket::Info() { std::ostringstream os; - os << "TARGET=@" << m_iID << " "; + os << "TARGET=@" << id() << " "; if (isControl()) { diff --git a/srtcore/packet.h b/srtcore/packet.h index e6d2516a9..4f17d4df4 100644 --- a/srtcore/packet.h +++ b/srtcore/packet.h @@ -349,12 +349,15 @@ class CPacket CPacket(const CPacket&); public: - int32_t& m_iSeqNo; // alias: sequence number - int32_t& m_iMsgNo; // alias: message number - int32_t& m_iTimeStamp; // alias: timestamp - int32_t& m_iID; // alias: destination SRT socket ID char*& m_pcData; // alias: payload (data packet) / control information fields (control packet) + SRTU_PROPERTY_RO(SRTSOCKET, id, SRTSOCKET(m_nHeader[SRT_PH_ID])); + SRTU_PROPERTY_WO_ARG(SRTSOCKET, id, m_nHeader[SRT_PH_ID] = int32_t(arg)); + + SRTU_PROPERTY_RW(int32_t, seqno, m_nHeader[SRT_PH_SEQNO]); + SRTU_PROPERTY_RW(int32_t, msgflags, m_nHeader[SRT_PH_MSGNO]); + SRTU_PROPERTY_RW(int32_t, timestamp, m_nHeader[SRT_PH_TIMESTAMP]); + // Experimental: sometimes these references don't work! char* getData(); char* release(); diff --git a/srtcore/packetfilter.cpp b/srtcore/packetfilter.cpp index 1b05c4f4e..e7a9ca2bb 100644 --- a/srtcore/packetfilter.cpp +++ b/srtcore/packetfilter.cpp @@ -226,7 +226,7 @@ bool srt::PacketFilter::packControlPacket(int32_t seq, int kflg, CPacket& w_pack // - Crypto // - Message Number // will be set to 0/false - w_packet.m_iMsgNo = SRT_MSGNO_CONTROL | MSGNO_PACKET_BOUNDARY::wrap(PB_SOLO); + w_packet.set_msgflags(SRT_MSGNO_CONTROL | MSGNO_PACKET_BOUNDARY::wrap(PB_SOLO)); // ... and then fix only the Crypto flags w_packet.setMsgCryptoFlags(EncryptionKeySpec(kflg)); diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 1bdff1848..bb6920310 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -894,7 +894,7 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst // Need a stub value for a case when there's no unit provided ("storage depleted" case). // It should be normally NOT IN USE because in case of "storage depleted", rst != RST_OK. - const SRTSOCKET dest_id = pkt ? pkt->m_iID : 0; + const SRTSOCKET dest_id = pkt ? pkt->id() : 0; // If no socket were qualified for further handling, finish here. // Otherwise toRemove and toProcess contain items to handle. @@ -1380,7 +1380,7 @@ srt::EReadStatus srt::CRcvQueue::worker_RetrieveUnit(int32_t& w_id, CUnit*& w_un if (rst == RST_OK) { - w_id = w_unit->m_Packet.m_iID; + w_id = w_unit->m_Packet.id(); HLOGC(qrlog.Debug, log << "INCOMING PACKET: FROM=" << w_addr.str() << " BOUND=" << m_pChannel->bindAddressAny().str() << " " << w_unit->m_Packet.Info()); diff --git a/srtcore/utilities.h b/srtcore/utilities.h index 31e05b205..a9f3bdba1 100644 --- a/srtcore/utilities.h +++ b/srtcore/utilities.h @@ -1089,6 +1089,7 @@ inline ValueType avg_iir_w(ValueType old_value, ValueType new_value, size_t new_ #define SRTU_PROPERTY_RR(type, name, field) type name() { return field; } #define SRTU_PROPERTY_RO(type, name, field) type name() const { return field; } #define SRTU_PROPERTY_WO(type, name, field) void set_##name(type arg) { field = arg; } +#define SRTU_PROPERTY_WO_ARG(type, name, expr) void set_##name(type arg) { expr; } #define SRTU_PROPERTY_WO_CHAIN(otype, type, name, field) otype& set_##name(type arg) { field = arg; return *this; } #define SRTU_PROPERTY_RW(type, name, field) SRTU_PROPERTY_RO(type, name, field); SRTU_PROPERTY_WO(type, name, field) #define SRTU_PROPERTY_RRW(type, name, field) SRTU_PROPERTY_RR(type, name, field); SRTU_PROPERTY_WO(type, name, field) diff --git a/srtcore/window.h b/srtcore/window.h index ecc4a4947..a9dba46ba 100644 --- a/srtcore/window.h +++ b/srtcore/window.h @@ -236,7 +236,7 @@ class CPktTimeWindow: CPktTimeWindowTools /// Shortcut to test a packet for possible probe 1 or 2 void probeArrival(const CPacket& pkt, bool unordered) { - const int inorder16 = pkt.m_iSeqNo & PUMASK_SEQNO_PROBE; + const int inorder16 = pkt.seqno() & PUMASK_SEQNO_PROBE; // for probe1, we want 16th packet if (inorder16 == 0) @@ -257,7 +257,7 @@ class CPktTimeWindow: CPktTimeWindowTools /// Record the arrival time of the first probing packet. void probe1Arrival(const CPacket& pkt, bool unordered) { - if (unordered && pkt.m_iSeqNo == m_Probe1Sequence) + if (unordered && pkt.seqno() == m_Probe1Sequence) { // Reset the starting probe into "undefined", when // a packet has come as retransmitted before the @@ -267,7 +267,7 @@ class CPktTimeWindow: CPktTimeWindowTools } m_tsProbeTime = sync::steady_clock::now(); - m_Probe1Sequence = pkt.m_iSeqNo; // Record the sequence where 16th packet probe was taken + m_Probe1Sequence = pkt.seqno(); // Record the sequence where 16th packet probe was taken } /// Record the arrival time of the second probing packet and the interval between packet pairs. @@ -282,7 +282,7 @@ class CPktTimeWindow: CPktTimeWindowTools // expected packet pair, behave as if the 17th packet was lost. // no start point yet (or was reset) OR not very next packet - if (m_Probe1Sequence == SRT_SEQNO_NONE || CSeqNo::incseq(m_Probe1Sequence) != pkt.m_iSeqNo) + if (m_Probe1Sequence == SRT_SEQNO_NONE || CSeqNo::incseq(m_Probe1Sequence) != pkt.seqno()) return; // Grab the current time before trying to acquire diff --git a/test/test_buffer_rcv.cpp b/test/test_buffer_rcv.cpp index 15356c8f2..db0db317a 100644 --- a/test/test_buffer_rcv.cpp +++ b/test/test_buffer_rcv.cpp @@ -54,21 +54,26 @@ class CRcvBufferReadMsg EXPECT_NE(unit, nullptr); CPacket& packet = unit->m_Packet; - packet.m_iSeqNo = seqno; - packet.m_iTimeStamp = ts; + packet.set_seqno(seqno); + packet.set_timestamp(ts); packet.setLength(m_payload_sz); - generatePayload(packet.data(), packet.getLength(), packet.m_iSeqNo); + generatePayload(packet.data(), packet.getLength(), packet.seqno()); - packet.m_iMsgNo = PacketBoundaryBits(PB_SUBSEQUENT); + int32_t pktMsgFlags = PacketBoundaryBits(PB_SUBSEQUENT); if (pb_first) - packet.m_iMsgNo |= PacketBoundaryBits(PB_FIRST); + pktMsgFlags |= PacketBoundaryBits(PB_FIRST); if (pb_last) - packet.m_iMsgNo |= PacketBoundaryBits(PB_LAST); + pktMsgFlags |= PacketBoundaryBits(PB_LAST); + + if (!out_of_order) + { + pktMsgFlags |= MSGNO_PACKET_INORDER::wrap(1); + } + packet.set_msgflags(pktMsgFlags); if (!out_of_order) { - packet.m_iMsgNo |= MSGNO_PACKET_INORDER::wrap(1); EXPECT_TRUE(packet.getMsgOrderFlag()); } diff --git a/test/test_fec_rebuilding.cpp b/test/test_fec_rebuilding.cpp index 46afd8981..91d269221 100644 --- a/test/test_fec_rebuilding.cpp +++ b/test/test_fec_rebuilding.cpp @@ -792,7 +792,7 @@ TEST_F(TestFECRebuilding, NoRebuild) // - Crypto // - Message Number // will be set to 0/false - fecpkt->m_iMsgNo = MSGNO_PACKET_BOUNDARY::wrap(PB_SOLO); + fecpkt->set_msgflags(MSGNO_PACKET_BOUNDARY::wrap(PB_SOLO)); // ... and then fix only the Crypto flags fecpkt->setMsgCryptoFlags(EncryptionKeySpec(0)); @@ -869,7 +869,7 @@ TEST_F(TestFECRebuilding, Rebuild) // - Crypto // - Message Number // will be set to 0/false - fecpkt->m_iMsgNo = MSGNO_PACKET_BOUNDARY::wrap(PB_SOLO); + fecpkt->set_msgflags(MSGNO_PACKET_BOUNDARY::wrap(PB_SOLO)); // ... and then fix only the Crypto flags fecpkt->setMsgCryptoFlags(EncryptionKeySpec(0)); @@ -887,7 +887,7 @@ TEST_F(TestFECRebuilding, Rebuild) // Set artificially the SN_REXMIT flag in the skipped source packet // because the rebuilt packet shall have REXMIT flag set. - skipped.m_iMsgNo |= MSGNO_REXMIT::wrap(true); + skipped.set_msgflags(skipped.msgflags() | MSGNO_REXMIT::wrap(true)); // Compare the header EXPECT_EQ(skipped.getHeader()[SRT_PH_SEQNO], rebuilt.hdr[SRT_PH_SEQNO]); From 5c3abbc5d36753117bc200825d178c13d0edc1c9 Mon Sep 17 00:00:00 2001 From: Sektor van Skijlen Date: Fri, 16 Feb 2024 15:36:08 +0100 Subject: [PATCH 3/3] Update test/test_buffer_rcv.cpp Co-authored-by: Maxim Sharabayko --- test/test_buffer_rcv.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/test_buffer_rcv.cpp b/test/test_buffer_rcv.cpp index ffac0170c..554c6aae5 100644 --- a/test/test_buffer_rcv.cpp +++ b/test/test_buffer_rcv.cpp @@ -68,9 +68,7 @@ class CRcvBufferReadMsg pktMsgFlags |= PacketBoundaryBits(PB_LAST); if (!out_of_order) - { pktMsgFlags |= MSGNO_PACKET_INORDER::wrap(1); - } packet.set_msgflags(pktMsgFlags); if (!out_of_order)