Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refax: smaller fixes and added utilities #2504

Merged
merged 16 commits into from
Sep 19, 2023
14 changes: 14 additions & 0 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,18 @@ class CRcvBuffer
inline int incPos(int pos, int inc = 1) const { return (pos + inc) % m_szSize; }
inline int decPos(int pos) const { return (pos - 1) >= 0 ? (pos - 1) : int(m_szSize - 1); }
inline int offPos(int pos1, int pos2) const { return (pos2 >= pos1) ? (pos2 - pos1) : int(m_szSize + pos2 - pos1); }
inline int cmpPos(int pos2, int pos1) const
{
// XXX maybe not the best implementation, but this keeps up to the rule
int off1 = pos1 >= m_iStartPos ? pos1 - m_iStartPos : pos1 + m_szSize - m_iStartPos;
int off2 = pos2 >= m_iStartPos ? pos2 - m_iStartPos : pos2 + m_szSize - m_iStartPos;

return off2 - off1;
}

// NOTE: Assumes that pUnit != NULL
CPacket& packetAt(int pos) { return m_entries[pos].pUnit->m_Packet; }
const CPacket& packetAt(int pos) const { return m_entries[pos].pUnit->m_Packet; }

private:
void countBytes(int pkts, int bytes);
Expand Down Expand Up @@ -341,6 +353,8 @@ class CRcvBuffer
time_point getTsbPdTimeBase(uint32_t usPktTimestamp) const;
void updateTsbPdTimeBase(uint32_t usPktTimestamp);

bool isTsbPd() const { return m_tsbpd.isEnabled(); }

/// Form a string of the current buffer fullness state.
/// number of packets acknowledged, TSBPD readiness, etc.
std::string strFullnessState(bool enable_debug_log, int iFirstUnackSeqNo, const time_point& tsNow) const;
Expand Down
6 changes: 5 additions & 1 deletion srtcore/buffer_snd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,11 @@ int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime,
continue;
}

HLOGC(bslog.Debug, log << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send");
HLOGC(bslog.Debug, log << CONID() << "CSndBuffer: picked up packet to send: size=" << readlen
<< " #" << w_packet.getMsgSeq()
<< " %" << w_packet.m_iSeqNo
<< " !" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));

break;
}

Expand Down
20 changes: 20 additions & 0 deletions srtcore/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,26 @@ inline std::string SrtVersionString(int version)

bool SrtParseConfig(std::string s, SrtConfig& w_config);


inline std::string FormatLossArray(const std::vector< std::pair<int32_t, int32_t> >& lra)
{
std::ostringstream os;

os << "[ ";
for (std::vector< std::pair<int32_t, int32_t> >::const_iterator i = lra.begin(); i != lra.end(); ++i)
{
int len = CSeqNo::seqoff(i->first, i->second);
os << "%" << i->first;
if (len > 1)
os << "+" << len;
os << " ";
}

os << "]";
return os.str();
}


} // namespace srt

#endif
140 changes: 22 additions & 118 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -955,9 +955,11 @@ void srt::CUDT::open()
m_tsLastRspAckTime = currtime;
m_tsLastSndTime.store(currtime);

#if ENABLE_BONDING
m_tsUnstableSince = steady_clock::time_point();
m_tsFreshActivation = steady_clock::time_point();
m_tsWarySince = steady_clock::time_point();
#endif

m_iReXmitCount = 1;
m_iPktCount = 0;
Expand Down Expand Up @@ -9282,14 +9284,16 @@ bool srt::CUDT::isRetransmissionAllowed(const time_point& tnow SRT_ATR_UNUSED)
return true;
}

std::pair<bool, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)
bool srt::CUDT::packData(CPacket& w_packet, steady_clock::time_point& w_nexttime)
{
int payload = 0;
bool probe = false;
bool new_packet_packed = false;

const steady_clock::time_point enter_time = steady_clock::now();

w_nexttime = enter_time;

if (!is_zero(m_tsNextSendTime) && enter_time > m_tsNextSendTime)
{
m_tdSendTimeDiff = m_tdSendTimeDiff.load() + (enter_time - m_tsNextSendTime);
Expand All @@ -9304,7 +9308,7 @@ std::pair<bool, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)
// start the dissolving process, this process will
// not be started until this function is finished.
if (!m_bOpened)
return std::make_pair(false, enter_time);
return false;

payload = isRetransmissionAllowed(enter_time)
? packLostData((w_packet))
Expand Down Expand Up @@ -9332,7 +9336,7 @@ std::pair<bool, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)
{
m_tsNextSendTime = steady_clock::time_point();
m_tdSendTimeDiff = steady_clock::duration();
return std::make_pair(false, enter_time);
return false;
}
new_packet_packed = true;

Expand Down Expand Up @@ -9414,7 +9418,9 @@ std::pair<bool, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)
#endif
}

return std::make_pair(payload >= 0, m_tsNextSendTime);
w_nexttime = m_tsNextSendTime;

return payload >= 0; // XXX shouldn't be > 0 ? == 0 is only when buffer range exceeded.
}

bool srt::CUDT::packUniqueData(CPacket& w_packet)
Expand All @@ -9425,7 +9431,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
if (cwnd <= flightspan)
{
HLOGC(qslog.Debug,
log << CONID() << "packData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow
log << CONID() << "packUniqueData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow
<< ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan);
return false;
}
Expand All @@ -9443,10 +9449,13 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
{
// Some packets were skipped due to TTL expiry.
m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo, pktskipseqno);
HLOGC(qslog.Debug, log << "packUniqueData: reading skipped " << pktskipseqno << " seq up to %" << m_iSndCurrSeqNo
<< " due to TTL expiry");
}

if (pld_size == 0)
{
HLOGC(qslog.Debug, log << "packUniqueData: nothing extracted from the buffer");
return false;
}

Expand All @@ -9472,15 +9481,15 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
// no ACK to be awaited. We can screw up all the variables that are
// initialized from ISN just after connection.
LOGC(qslog.Note,
log << CONID() << "packData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo
log << CONID() << "packUniqueData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo
<< " from SCHEDULING sequence " << w_packet.m_iSeqNo << " for the first packet: DIFF="
<< packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));
}
else
{
// There will be a serious data discrepancy between the agent and the peer.
LOGC(qslog.Error,
log << CONID() << "IPE: packData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo
log << CONID() << "IPE: packUniqueData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo
<< " from SCHEDULING sequence " << w_packet.m_iSeqNo << " in the middle of transition: DIFF="
<< packetspan << " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));
}
Expand All @@ -9493,7 +9502,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
seqpair[1] = CSeqNo::decseq(w_packet.m_iSeqNo);
const int32_t no_msgno = 0;
LOGC(qslog.Debug,
log << CONID() << "packData: Sending DROPREQ: SEQ: " << seqpair[0] << " - " << seqpair[1] << " ("
log << CONID() << "packUniqueData: Sending DROPREQ: SEQ: " << seqpair[0] << " - " << seqpair[1] << " ("
<< packetspan << " packets)");
sendCtrl(UMSG_DROPREQ, &no_msgno, seqpair, sizeof(seqpair));
// In case when this message is lost, the peer will still get the
Expand Down Expand Up @@ -9522,7 +9531,7 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
#endif
{
HLOGC(qslog.Debug,
log << CONID() << "packData: Applying EXTRACTION sequence " << m_iSndCurrSeqNo
log << CONID() << "packUniqueData: Applying EXTRACTION sequence " << m_iSndCurrSeqNo
<< " over SCHEDULING sequence " << w_packet.m_iSeqNo << " for socket not in group:"
<< " DIFF=" << CSeqNo::seqcmp(m_iSndCurrSeqNo, w_packet.m_iSeqNo)
<< " STAMP=" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));
Expand Down Expand Up @@ -10359,67 +10368,6 @@ void srt::CUDT::updateIdleLinkFrom(CUDT* source)
setInitialRcvSeq(source->m_iRcvLastSkipAck);
}

// XXX This function is currently unused. It should be fixed and put into use.
// See the blocked call in CUDT::processData().
// XXX REVIEW LOCKS WHEN REACTIVATING!
srt::CUDT::loss_seqs_t srt::CUDT::defaultPacketArrival(void* vself, CPacket& pkt)
{
// [[using affinity(m_pRcvBuffer->workerThread())]];
CUDT* self = (CUDT*)vself;
loss_seqs_t output;

// XXX When an alternative packet arrival callback is installed
// in case of groups, move this part to the groupwise version.

if (self->m_parent->m_GroupOf)
{
groups::SocketData* gi = self->m_parent->m_GroupMemberData;
if (gi->rcvstate < SRT_GST_RUNNING) // PENDING or IDLE, tho PENDING is unlikely
{
HLOGC(qrlog.Debug, log << "defaultPacketArrival: IN-GROUP rcv state transition to RUNNING. NOT checking for loss");
gi->rcvstate = SRT_GST_RUNNING;
return output;
}
}

const int initial_loss_ttl = (self->m_bPeerRexmitFlag) ? self->m_iReorderTolerance : 0;

int seqdiff = CSeqNo::seqcmp(pkt.m_iSeqNo, self->m_iRcvCurrSeqNo);

HLOGC(qrlog.Debug, log << "defaultPacketArrival: checking sequence " << pkt.m_iSeqNo
<< " against latest " << self->m_iRcvCurrSeqNo << " (distance: " << seqdiff << ")");

// Loss detection.
if (seqdiff > 1) // packet is later than the very subsequent packet
{
const int32_t seqlo = CSeqNo::incseq(self->m_iRcvCurrSeqNo);
const int32_t seqhi = CSeqNo::decseq(pkt.m_iSeqNo);

{
// If loss found, insert them to the receiver loss list
ScopedLock lg (self->m_RcvLossLock);
self->m_pRcvLossList->insert(seqlo, seqhi);

if (initial_loss_ttl)
{
// pack loss list for (possibly belated) NAK
// The LOSSREPORT will be sent in a while.
self->m_FreshLoss.push_back(CRcvFreshLoss(seqlo, seqhi, initial_loss_ttl));
HLOGF(qrlog.Debug, "defaultPacketArrival: added loss sequence %d-%d (%d) with tolerance %d", seqlo, seqhi,
1+CSeqNo::seqcmp(seqhi, seqlo), initial_loss_ttl);
}
}

if (!initial_loss_ttl)
{
// old code; run immediately when tolerance = 0
// or this feature isn't used because of the peer
output.push_back(make_pair(seqlo, seqhi));
}
}

return output;
}
#endif

/// This function is called when a packet has arrived, which was behind the current
Expand Down Expand Up @@ -10498,53 +10446,8 @@ void srt::CUDT::unlose(const CPacket &packet)
if (m_bPeerRexmitFlag == 0 || m_iReorderTolerance == 0)
return;

size_t i = 0;
int had_ttl = 0;
for (i = 0; i < m_FreshLoss.size(); ++i)
{
had_ttl = m_FreshLoss[i].ttl;
switch (m_FreshLoss[i].revoke(sequence))
{
case CRcvFreshLoss::NONE:
continue; // Not found. Search again.

case CRcvFreshLoss::STRIPPED:
goto breakbreak; // Found and the modification is applied. We're done here.

case CRcvFreshLoss::DELETE:
// No more elements. Kill it.
m_FreshLoss.erase(m_FreshLoss.begin() + i);
// Every loss is unique. We're done here.
goto breakbreak;

case CRcvFreshLoss::SPLIT:
// Oh, this will be more complicated. This means that it was in between.
{
// So create a new element that will hold the upper part of the range,
// and this one modify to be the lower part of the range.

// Keep the current end-of-sequence value for the second element
int32_t next_end = m_FreshLoss[i].seq[1];

// seq-1 set to the end of this element
m_FreshLoss[i].seq[1] = CSeqNo::decseq(sequence);
// seq+1 set to the begin of the next element
int32_t next_begin = CSeqNo::incseq(sequence);

// Use position of the NEXT element because insertion happens BEFORE pointed element.
// Use the same TTL (will stay the same in the other one).
m_FreshLoss.insert(m_FreshLoss.begin() + i + 1,
CRcvFreshLoss(next_begin, next_end, m_FreshLoss[i].ttl));
}
goto breakbreak;
}
}

// Could have made the "return" instruction instead of goto, but maybe there will be something
// to add in future, so keeping that.
breakbreak:;

if (i != m_FreshLoss.size())
int had_ttl = 0;
if (CRcvFreshLoss::removeOne((m_FreshLoss), sequence, (&had_ttl)))
{
HLOGF(qrlog.Debug, "sequence %d removed from belated lossreport record", sequence);
}
Expand Down Expand Up @@ -11171,7 +11074,7 @@ bool srt::CUDT::checkExpTimer(const steady_clock::time_point& currtime, int chec
// Application will detect this when it calls any UDT methods next time.
//
HLOGC(xtlog.Debug,
log << CONID() << "CONNECTION EXPIRED after " << count_milliseconds(currtime - last_rsp_time) << "ms");
log << CONID() << "CONNECTION EXPIRED after " << FormatDuration<DUNIT_MS>(currtime - last_rsp_time) << " - BREAKING");
m_bClosing = true;
m_bBroken = true;
m_iBrokenCounter = 30;
Expand Down Expand Up @@ -11394,6 +11297,7 @@ void srt::CUDT::completeBrokenConnectionDependencies(int errorcode)
{
// XXX This somehow can cause a deadlock
// uglobal()->close(m_parent);
LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group.");
m_parent->setBrokenClosed();
}
#endif
Expand Down
22 changes: 15 additions & 7 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,10 @@ class CUDT
int32_t schedSeqNo() const { return m_iSndNextSeqNo; }
bool overrideSndSeqNo(int32_t seq);

#if ENABLE_BONDING
sync::steady_clock::time_point lastRspTime() const { return m_tsLastRspTime.load(); }
sync::steady_clock::time_point freshActivationStart() const { return m_tsFreshActivation; }
#endif

int32_t rcvSeqNo() const { return m_iRcvCurrSeqNo; }
int flowWindowSize() const { return m_iFlowWindowSize; }
Expand Down Expand Up @@ -387,6 +389,11 @@ class CUDT
return (int32_t) sync::count_microseconds(from_time - tsStartTime);
}

static void setPacketTS(CPacket& p, const time_point& start_time, const time_point& ts)
{
p.m_iTimeStamp = makeTS(ts, start_time);
}

/// @brief Set the timestamp field of the packet using the provided value (no check)
/// @param p the packet structure to set the timestamp on.
/// @param ts timestamp to use as a source for packet timestamp.
Expand Down Expand Up @@ -1044,13 +1051,12 @@ class CUDT

/// Pack in CPacket the next data to be send.
///
/// @param packet [in, out] a CPacket structure to fill
/// @param packet [out] a CPacket structure to fill
/// @param nexttime [out] Time when this socket should be next time picked up for processing.
///
/// @return A pair of values is returned (is_payload_valid, timestamp).
/// If is_payload_valid is false, there was nothing packed for sending,
/// and the timestamp value should be ignored.
/// The timestamp is the full source/origin timestamp of the data.
std::pair<bool, time_point> packData(CPacket& packet);
/// @retval true A packet was extracted for sending, the socket should be rechecked at @a nexttime
/// @retval false Nothing was extracted for sending, @a nexttime should be ignored
bool packData(CPacket& packet, time_point& nexttime);

int processData(CUnit* unit);

Expand Down Expand Up @@ -1134,10 +1140,12 @@ class CUDT
static const int PACKETPAIR_MASK = 0xF;

private: // Timers functions
#if ENABLE_BONDING
time_point m_tsFreshActivation; // GROUPS: time of fresh activation of the link, or 0 if past the activation phase or idle
time_point m_tsUnstableSince; // GROUPS: time since unexpected ACK delay experienced, or 0 if link seems healthy
time_point m_tsWarySince; // GROUPS: time since an unstable link has first some response

#endif

static const int BECAUSE_NO_REASON = 0, // NO BITS
BECAUSE_ACK = 1 << 0,
BECAUSE_LITEACK = 1 << 1,
Expand Down
Loading