Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINT] Added more elaborate log suppression and applied to RCV-DROPPED logs #2733

Merged
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ endforeach()
# SRT_DEBUG_BONDING_STATES 1
# SRT_DEBUG_RTT 1 /* RTT trace */
# SRT_MAVG_SAMPLING_RATE 40 /* Max sampling rate */
# SRT_ENABLE_FREQUENT_LOG_TRACE 0 : set to 1 to enable printing reason for suppressed freq logs

# option defaults
set(ENABLE_HEAVY_LOGGING_DEFAULT OFF)
Expand Down
16 changes: 16 additions & 0 deletions srtcore/atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,22 @@ class atomic {
#endif
}

T operator|=(T i) {
#if defined(ATOMIC_USE_SRT_SYNC_MUTEX) && (ATOMIC_USE_SRT_SYNC_MUTEX == 1)
ScopedLock lg_(mutex_);
const T t = value_ |= i;
return t;
#elif defined(ATOMIC_USE_GCC_INTRINSICS)
return __atomic_or_fetch(&value_, i, __ATOMIC_SEQ_CST);
#elif defined(ATOMIC_USE_MSVC_INTRINSICS)
return msvc::interlocked<T>::or_fetch(&value_, i);
#elif defined(ATOMIC_USE_CPP11_ATOMIC)
return value_ |= i;
#else
#error "Implement Me."
#endif
}

/// @brief Performs an atomic compare-and-swap (CAS) operation.
///
/// The value of the atomic object is only updated to the new value if the
Expand Down
5 changes: 5 additions & 0 deletions srtcore/atomic_msvc.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ struct interlocked<T, 1> {
return static_cast<T>(_InterlockedExchange8(
reinterpret_cast<volatile char*>(x), static_cast<const char>(new_val)));
}

static inline T or_fetch(T volatile* x, const T val) {
return static_cast<T>(_InterlockedOr8(
reinterpret_cast<volatile char*>(x), static_cast<const char>(val)));
}
};

template <typename T>
Expand Down
89 changes: 74 additions & 15 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ void srt::CUDT::open()
#endif

m_iReXmitCount = 1;
memset(&m_aSuppressedMsg, 0, sizeof m_aSuppressedMsg);
m_iPktCount = 0;
m_iLightACKCount = 1;
m_tsNextSendTime = steady_clock::time_point();
Expand Down Expand Up @@ -5419,9 +5420,19 @@ void * srt::CUDT::tsbpd(void* param)
<< iDropCnt << " packets) playable at " << FormatTime(info.tsbpd_time) << " delayed "
<< (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0') << (timediff_us % 1000) << " ms");
#endif
LOGC(brlog.Warn, log << self->CONID() << "RCV-DROPPED " << iDropCnt << " packet(s). Packet seqno %" << info.seqno
<< " delayed for " << (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0')
<< (timediff_us % 1000) << " ms");
string why;
if (self->frequentLogAllowed(FREQLOGFA_RCV_DROPPED, tnow, (why)))
{
LOGC(brlog.Warn, log << self->CONID() << "RCV-DROPPED " << iDropCnt << " packet(s). Packet seqno %" << info.seqno
<< " delayed for " << (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0')
<< (timediff_us % 1000) << " ms " << why);
}
#if SRT_ENABLE_FREQUENT_LOG_TRACE
else
{
LOGC(brlog.Warn, log << "SUPPRESSED: RCV-DROPPED LOG: " << why);
}
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
#endif
#endif

tsNextDelivery = steady_clock::time_point(); // Ready to read, nothing to wait for.
Expand Down Expand Up @@ -5878,13 +5889,44 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any&
addressAndSend((response));
}

bool srt::CUDT::frequentLogAllowed(const time_point& tnow) const
bool srt::CUDT::frequentLogAllowed(size_t logid, const time_point& tnow, std::string& w_why)
{
#ifndef SRT_LOG_SLOWDOWN_FREQ_MS
#define SRT_LOG_SLOWDOWN_FREQ_MS 1000
#endif

return (m_tsLogSlowDown + milliseconds_from(SRT_LOG_SLOWDOWN_FREQ_MS)) <= tnow;
bool is_suppressed = IsSet(m_LogSlowDownExpired, BIT(logid));
bool isnow = (m_tsLogSlowDown.load() + milliseconds_from(SRT_LOG_SLOWDOWN_FREQ_MS)) <= tnow;
if (isnow)
{
// Theoretically this should prevent other calls of this function to take
// set their values simultaneously, but if it happened that the time is
// also set, this section will not fire for the other log, if it didn't do
// the check yet.
m_LogSlowDownExpired.store(uint8_t(BIT(logid))); // Clear all other bits

// Note: it may happen that two threads could intermix one another between
// the check and setting up, but this will at worst case set the slightly
// later time again.
m_tsLogSlowDown.store(tnow);

is_suppressed = false;

int supr = m_aSuppressedMsg[logid];

if (supr > 0)
w_why = Sprint("++SUPPRESSED: ", supr);
m_aSuppressedMsg[logid] = 0;
}
else
{
w_why = Sprint("Too early - last one was ", FormatDuration<DUNIT_MS>(tnow - m_tsLogSlowDown.load()));
// Set YOUR OWN bit, atomically.
m_LogSlowDownExpired |= uint8_t(BIT(logid));
++m_aSuppressedMsg[logid];
}

return !is_suppressed;
}

// This function is required to be called when a caller receives an INDUCTION
Expand Down Expand Up @@ -8947,15 +8989,25 @@ void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt)

if (iDropCnt > 0)
{
LOGC(brlog.Warn, log << CONID() << "RCV-DROPPED " << iDropCnt << " packet(s), seqno range %"
<< dropdata[0] << "-%" << dropdata[1] << ", msgno " << ctrlpkt.getMsgSeq(using_rexmit_flag)
<< " (SND DROP REQUEST).");
ScopedLock lg (m_StatsLock);
const steady_clock::time_point tnow = steady_clock::now();
string why;
if (frequentLogAllowed(FREQLOGFA_RCV_DROPPED, tnow, (why)))
{
LOGC(brlog.Warn, log << CONID() << "RCV-DROPPED " << iDropCnt << " packet(s), seqno range %"
<< dropdata[0] << "-%" << dropdata[1] << ", msgno " << ctrlpkt.getMsgSeq(using_rexmit_flag)
<< " (SND DROP REQUEST). " << why);
}
#if SRT_ENABLE_FREQUENT_LOG_TRACE
else
{
LOGC(brlog.Warn, log << "SUPPRESSED: RCV-DROPPED LOG: " << why);
}
#endif

enterCS(m_StatsLock);
// Estimate dropped bytes from average payload size.
const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize();
m_stats.rcvr.dropped.count(stats::BytesPacketsCount(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt));
leaveCS(m_StatsLock);
}
}
// When the drop request was received, it means that there are
Expand Down Expand Up @@ -10073,12 +10125,19 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&
ScopedLock lg(m_StatsLock);
m_stats.rcvr.dropped.count(stats::BytesPacketsCount(iDropCnt * rpkt.getLength(), iDropCnt));
m_stats.rcvr.undecrypted.count(stats::BytesPacketsCount(rpkt.getLength(), 1));
if (frequentLogAllowed(tnow))
string why;
if (frequentLogAllowed(FREQLOGFA_ENCRYPTION_FAILURE, tnow, (why)))
{
LOGC(qrlog.Warn, log << CONID() << "Decryption failed (seqno %" << u->m_Packet.getSeqNo() << "), dropped "
<< iDropCnt << ". pktRcvUndecryptTotal=" << m_stats.rcvr.undecrypted.total.count() << ".");
m_tsLogSlowDown = tnow;
<< iDropCnt << ". pktRcvUndecryptTotal=" << m_stats.rcvr.undecrypted.total.count() << "." << why);
}
#if SRT_ENABLE_FREQUENT_LOG_TRACE
else
{

LOGC(qrlog.Warn, log << "SUPPRESSED: Decryption failed LOG: " << why);
}
#endif
}
}
else if (m_pCryptoControl && m_pCryptoControl->getCryptoMode() == CSrtConfig::CIPHER_MODE_AES_GCM)
Expand All @@ -10090,11 +10149,11 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&
ScopedLock lg(m_StatsLock);
m_stats.rcvr.dropped.count(stats::BytesPacketsCount(iDropCnt* rpkt.getLength(), iDropCnt));
m_stats.rcvr.undecrypted.count(stats::BytesPacketsCount(rpkt.getLength(), 1));
if (frequentLogAllowed(tnow))
string why;
if (frequentLogAllowed(FREQLOGFA_ENCRYPTION_FAILURE, tnow, (why)))
{
LOGC(qrlog.Warn, log << CONID() << "Packet not encrypted (seqno %" << u->m_Packet.getSeqNo() << "), dropped "
<< iDropCnt << ". pktRcvUndecryptTotal=" << m_stats.rcvr.undecrypted.total.count() << ".");
m_tsLogSlowDown = tnow;
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ modified by

#include <haicrypt.h>

#ifndef SRT_ENABLE_FREQUENT_LOG_TRACE
#define SRT_ENABLE_FREQUENT_LOG_TRACE 0
#endif


// TODO: Utility function - to be moved to utilities.h?
template <class T>
Expand Down Expand Up @@ -922,14 +926,20 @@ class CUDT
SRT_ATTR_GUARDED_BY(m_RecvAckLock)
int32_t m_iReXmitCount; // Re-Transmit Count since last ACK

time_point m_tsLogSlowDown; // The last time a log message from the "slow down" group was shown.
static const size_t
MAX_FREQLOGFA = 2,
FREQLOGFA_ENCRYPTION_FAILURE = 0,
FREQLOGFA_RCV_DROPPED = 1;
atomic_time_point m_tsLogSlowDown; // The last time a log message from the "slow down" group was shown.
// The "slow down" group of logs are those that can be printed too often otherwise, but can't be turned off (warnings and errors).
// Currently only used by decryption failure message, therefore no mutex protection needed.
sync::atomic<uint8_t> m_LogSlowDownExpired; // Can't use bitset because atomic
sync::atomic<int> m_aSuppressedMsg[MAX_FREQLOGFA];

/// @brief Check if a frequent log can be shown.
/// @param tnow current time
/// @return true if it is ok to print a frequent log message.
bool frequentLogAllowed(const time_point& tnow) const;
bool frequentLogAllowed(size_t logid, const time_point& tnow, std::string& why);

private: // Receiving related data
CRcvBuffer* m_pRcvBuffer; //< Receiver buffer
Expand Down