Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Minor internal logging format changes. #2939

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,20 @@ class CRcvBuffer
return (m_iMaxPosOff == 0);
}

/// Returns the currently used number of cells, including
/// gaps with empty cells, or in other words, the distance
/// between the initial position and the youngest received packet.
size_t size() const
{
return m_iMaxPosOff;
}

// Returns true if the buffer is full. Requires locking.
bool full() const
{
return size() == capacity();
}

/// Return buffer capacity.
/// One slot had to be empty in order to tell the difference between "empty buffer" and "full buffer".
/// E.g. m_iFirstNonreadPos would again point to m_iStartPos if m_szSize entries are added continiously.
Expand Down Expand Up @@ -333,9 +347,8 @@ class CRcvBuffer
EntryStatus status;
};

//static Entry emptyEntry() { return Entry { NULL, EntryState_Empty }; }

FixedArray<Entry> m_entries;
typedef FixedArray<Entry> entries_t;
entries_t m_entries;

const size_t m_szSize; // size of the array of units (buffer)
CUnitQueue* m_pUnitQueue; // the shared unit queue
Expand Down
54 changes: 40 additions & 14 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ void srt::CUDT::construct()
m_iPeerTsbPdDelay_ms = 0;
m_bPeerTsbPd = false;
m_bTsbPd = false;
m_bTsbPdAckWakeup = false;
m_bTsbPdNeedsWakeup = false;
m_bGroupTsbPd = false;
m_bPeerTLPktDrop = false;
m_bBufferWasFull = false;
Expand Down Expand Up @@ -5405,7 +5405,7 @@ void * srt::CUDT::tsbpd(void* param)
CUniqueSync recvdata_lcc (self->m_RecvLock, self->m_RecvDataCond);
CSync tsbpd_cc(self->m_RcvTsbPdCond, recvdata_lcc.locker());

self->m_bTsbPdAckWakeup = true;
self->m_bTsbPdNeedsWakeup = true;
while (!self->m_bClosing)
{
steady_clock::time_point tsNextDelivery; // Next packet delivery time
Expand All @@ -5425,6 +5425,21 @@ void * srt::CUDT::tsbpd(void* param)
const bool is_time_to_deliver = !is_zero(info.tsbpd_time) && (tnow >= info.tsbpd_time);
tsNextDelivery = info.tsbpd_time;

#if ENABLE_HEAVY_LOGGING
if (info.seqno == SRT_SEQNO_NONE)
{
HLOGC(tslog.Debug, log << self->CONID() << "sok/tsbpd: packet check: NO PACKETS");
}
else
{
HLOGC(tslog.Debug, log << self->CONID() << "sok/tsbpd: packet check: %"
<< info.seqno << " T=" << FormatTime(tsNextDelivery)
<< " diff-now-playtime=" << FormatDuration(tnow - tsNextDelivery)
<< " ready=" << is_time_to_deliver
<< " ondrop=" << info.seq_gap);
}
#endif

if (!self->m_bTLPktDrop)
{
rxready = !info.seq_gap && is_time_to_deliver;
Expand Down Expand Up @@ -5470,8 +5485,8 @@ void * srt::CUDT::tsbpd(void* param)
if (rxready)
{
HLOGC(tslog.Debug,
log << self->CONID() << "tsbpd: PLAYING PACKET seq=" << info.seqno << " (belated "
<< (count_milliseconds(steady_clock::now() - info.tsbpd_time)) << "ms)");
log << self->CONID() << "tsbpd: PLAYING PACKET seq=" << info.seqno << " (belated "
<< FormatDuration<DUNIT_MS>(steady_clock::now() - info.tsbpd_time) << ")");
/*
* There are packets ready to be delivered
* signal a waiting "recv" call if there is any data available
Expand Down Expand Up @@ -5534,19 +5549,21 @@ void * srt::CUDT::tsbpd(void* param)
if (self->m_bClosing)
break;

SRT_ATR_UNUSED bool bWokeUpOnSignal = true;

if (!is_zero(tsNextDelivery))
{
IF_HEAVY_LOGGING(const steady_clock::duration timediff = tsNextDelivery - tnow);
/*
* Buffer at head of queue is not ready to play.
* Schedule wakeup when it will be.
*/
self->m_bTsbPdAckWakeup = false;
self->m_bTsbPdNeedsWakeup = false;
HLOGC(tslog.Debug,
log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << info.seqno
<< " T=" << FormatTime(tsNextDelivery) << " - waiting " << count_milliseconds(timediff) << "ms");
log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << info.seqno
<< " T=" << FormatTime(tsNextDelivery) << " - waiting " << FormatDuration<DUNIT_MS>(timediff));
THREAD_PAUSED();
tsbpd_cc.wait_until(tsNextDelivery);
bWokeUpOnSignal = tsbpd_cc.wait_until(tsNextDelivery);
THREAD_RESUMED();
}
else
Expand All @@ -5563,13 +5580,15 @@ void * srt::CUDT::tsbpd(void* param)
* - Closing the connection
*/
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack");
self->m_bTsbPdAckWakeup = true;
self->m_bTsbPdNeedsWakeup = true;
THREAD_PAUSED();
tsbpd_cc.wait();
THREAD_RESUMED();
}

HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!");
HLOGC(tslog.Debug,
log << self->CONID() << "tsbpd: WAKE UP [" << (bWokeUpOnSignal ? "signal" : "timeout") << "]!!! - "
<< "NOW=" << FormatTime(steady_clock::now()));
}
THREAD_EXIT();
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING");
Expand Down Expand Up @@ -6951,6 +6970,12 @@ bool srt::CUDT::isRcvBufferReadyNoLock() const
return m_pRcvBuffer->isRcvDataReady(steady_clock::now());
}

bool srt::CUDT::isRcvBufferFull() const
{
ScopedLock lck(m_RcvBufferLock);
return m_pRcvBuffer->full();
}

// int by_exception: accepts values of CUDTUnited::ErrorHandling:
// - 0 - by return value
// - 1 - by exception
Expand Down Expand Up @@ -7738,8 +7763,8 @@ bool srt::CUDT::updateCC(ETransmissionEvent evt, const EventVariant arg)
m_iCongestionWindow = cgwindow;
#if ENABLE_HEAVY_LOGGING
HLOGC(rslog.Debug,
log << CONID() << "updateCC: updated values from congctl: interval=" << count_microseconds(m_tdSendInterval) << " us ("
<< "tk (" << m_CongCtl->pktSndPeriod_us() << "us) cgwindow="
log << CONID() << "updateCC: updated values from congctl: interval=" << FormatDuration<DUNIT_US>(m_tdSendInterval)
<< " (cfg:" << m_CongCtl->pktSndPeriod_us() << "us) cgwindow="
<< std::setprecision(3) << cgwindow);
#endif
}
Expand Down Expand Up @@ -8141,7 +8166,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
/* Newly acknowledged data, signal TsbPD thread */
CUniqueSync tslcc (m_RecvLock, m_RcvTsbPdCond);
// m_bTsbPdAckWakeup is protected by m_RecvLock in the tsbpd() thread
if (m_bTsbPdAckWakeup)
if (m_bTsbPdNeedsWakeup)
tslcc.notify_one();
}
else
Expand Down Expand Up @@ -8204,7 +8229,8 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
else if (!bNeedFullAck)
{
// Not possible (m_iRcvCurrSeqNo+1 <% m_iRcvLastAck ?)
LOGC(xtlog.Error, log << CONID() << "sendCtrl(UMSG_ACK): IPE: curr %" << ack << " <% last %" << m_iRcvLastAck);
LOGC(xtlog.Error, log << CONID() << "sendCtrl(UMSG_ACK): IPE: curr(" << reason << ") %"
<< ack << " <% last %" << m_iRcvLastAck);
return nbsent;
}

Expand Down
12 changes: 10 additions & 2 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ class CUDT
#endif

int32_t rcvSeqNo() const { return m_iRcvCurrSeqNo; }
SRT_ATTR_REQUIRES(m_RecvAckLock)
int flowWindowSize() const { return m_iFlowWindowSize; }
int32_t deliveryRate() const { return m_iDeliveryRate; }
int bandwidth() const { return m_iBandwidth; }
Expand Down Expand Up @@ -388,6 +389,7 @@ class CUDT

/// Returns the number of packets in flight (sent, but not yet acknowledged).
/// @returns The number of packets in flight belonging to the interval [0; ...)
SRT_ATTR_REQUIRES(m_RecvAckLock)
int32_t getFlightSpan() const
{
return getFlightSpan(m_iSndLastAck, m_iSndCurrSeqNo);
Expand Down Expand Up @@ -697,6 +699,8 @@ class CUDT
/// the receiver fresh loss list.
void unlose(const CPacket& oldpacket);
void dropFromLossLists(int32_t from, int32_t to);

SRT_ATTR_REQUIRES(m_RecvAckLock)
bool getFirstNoncontSequence(int32_t& w_seq, std::string& w_log_reason);

SRT_ATTR_EXCLUDES(m_ConnectionLock)
Expand Down Expand Up @@ -752,6 +756,9 @@ class CUDT
SRT_ATTR_REQUIRES(m_RcvBufferLock)
bool isRcvBufferReadyNoLock() const;

SRT_ATTR_EXCLUDES(m_RcvBufferLock)
bool isRcvBufferFull() const;

// TSBPD thread main function.
static void* tsbpd(void* param);

Expand Down Expand Up @@ -987,7 +994,7 @@ class CUDT

sync::CThread m_RcvTsbPdThread; // Rcv TsbPD Thread handle
sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock
bool m_bTsbPdAckWakeup; // Signal TsbPd thread on Ack sent
bool m_bTsbPdNeedsWakeup; // Signal TsbPd thread to wake up on RCV buffer state change.
sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creating and joining

CallbackHolder<srt_listen_callback_fn> m_cbAcceptHook;
Expand Down Expand Up @@ -1136,7 +1143,8 @@ class CUDT
/// @return -2 The incoming packet exceeds the expected sequence by more than a length of the buffer (irrepairable discrepancy).
int handleSocketPacketReception(const std::vector<CUnit*>& incoming, bool& w_new_inserted, bool& w_was_sent_in_order, CUDT::loss_seqs_t& w_srt_loss_seqs);

/// Get the packet's TSBPD time.
/// Get the packet's TSBPD time -
/// the time when it is passed to the reading application.
/// The @a grp passed by void* is not used yet
/// and shall not be used when ENABLE_BONDING=0.
time_point getPktTsbPdTime(void* grp, const CPacket& packet);
Expand Down
4 changes: 2 additions & 2 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1092,8 +1092,8 @@ bool srt::CRendezvousQueue::qualifyToHandle(EReadStatus rst,
if ((rst == RST_AGAIN || i->m_iID != iDstSockID) && tsNow <= tsRepeat)
{
HLOGC(cnlog.Debug,
log << "RID:@" << i->m_iID << std::fixed << count_microseconds(tsNow - tsLastReq) / 1000.0
<< " ms passed since last connection request.");
log << "RID:@" << i->m_iID << " " << FormatDuration<DUNIT_MS>(tsNow - tsLastReq)
<< " passed since last connection request.");

continue;
}
Expand Down
Loading