Skip to content

Commit

Permalink
[core] Extracted IP family dependent changes in stats tools (#2799)
Browse files Browse the repository at this point in the history
from #2677.
  • Loading branch information
ethouris authored Sep 19, 2023
1 parent 6c723e5 commit 66e7468
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 41 deletions.
58 changes: 42 additions & 16 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,13 +480,17 @@ void srt::CUDT::getOpt(SRT_SOCKOPT optName, void *optval, int &optlen)
optlen = sizeof(int);
break;

// For SNDBUF/RCVBUF values take the variant that uses more memory.
// It is not possible to make sure what "family" is in use without
// checking if the socket is bound. This will also be the exact size
// of the memory in use.
case SRTO_SNDBUF:
*(int *)optval = m_config.iSndBufSize * (m_config.iMSS - CPacket::UDP_HDR_SIZE);
*(int *)optval = m_config.iSndBufSize * m_config.bytesPerPkt();
optlen = sizeof(int);
break;

case SRTO_RCVBUF:
*(int *)optval = m_config.iRcvBufSize * (m_config.iMSS - CPacket::UDP_HDR_SIZE);
*(int *)optval = m_config.iRcvBufSize * m_config.bytesPerPkt();
optlen = sizeof(int);
break;

Expand Down Expand Up @@ -768,7 +772,7 @@ void srt::CUDT::getOpt(SRT_SOCKOPT optName, void *optval, int &optlen)

case SRTO_PAYLOADSIZE:
optlen = sizeof(int);
*(int *)optval = (int) m_config.zExpPayloadSize;
*(int *)optval = (int) payloadSize();
break;

case SRTO_KMREFRESHRATE:
Expand Down Expand Up @@ -886,9 +890,13 @@ string srt::CUDT::getstreamid(SRTSOCKET u)
// Initial sequence number, loss, acknowledgement, etc.
void srt::CUDT::clearData()
{
m_iMaxSRTPayloadSize = m_config.iMSS - CPacket::UDP_HDR_SIZE - CPacket::HDR_SIZE;
const size_t full_hdr_size = CPacket::UDP_HDR_SIZE - CPacket::HDR_SIZE;
m_iMaxSRTPayloadSize = m_config.iMSS - full_hdr_size;
HLOGC(cnlog.Debug, log << CONID() << "clearData: PAYLOAD SIZE: " << m_iMaxSRTPayloadSize);

m_SndTimeWindow.initialize(full_hdr_size, m_iMaxSRTPayloadSize);
m_RcvTimeWindow.initialize(full_hdr_size, m_iMaxSRTPayloadSize);

m_iEXPCount = 1;
m_iBandwidth = 1; // pkts/sec
// XXX use some constant for this 16
Expand Down Expand Up @@ -3040,6 +3048,8 @@ bool srt::CUDT::checkApplyFilterConfig(const std::string &confstr)
m_config.sPacketFilterConfig.set(confstr);
}

// XXX Using less maximum payload size of IPv4 and IPv6; this is only about the payload size
// for live.
size_t efc_max_payload_size = SRT_LIVE_MAX_PLSIZE - cfg.extra_size;
if (m_config.zExpPayloadSize > efc_max_payload_size)
{
Expand Down Expand Up @@ -4191,7 +4201,7 @@ EConnectStatus srt::CUDT::processRendezvous(
// This must be done before prepareConnectionObjects(), because it sets ISN and m_iMaxSRTPayloadSize needed to create buffers.
if (!applyResponseSettings(pResponse))
{
LOGC(cnlog.Error, log << CONID() << "processRendezvous: rogue peer");
LOGC(cnlog.Error, log << CONID() << "processRendezvous: peer settings rejected");
return CONN_REJECT;
}

Expand Down Expand Up @@ -4683,6 +4693,12 @@ bool srt::CUDT::applyResponseSettings(const CPacket* pHspkt /*[[nullable]]*/) AT

// Re-configure according to the negotiated values.
m_config.iMSS = m_ConnRes.m_iMSS;

const size_t full_hdr_size = CPacket::UDP_HDR_SIZE + CPacket::HDR_SIZE;
m_iMaxSRTPayloadSize = m_config.iMSS - full_hdr_size;
HLOGC(cnlog.Debug, log << CONID() << "applyResponseSettings: PAYLOAD SIZE: " << m_iMaxSRTPayloadSize);
m_stats.setupHeaderSize(full_hdr_size);

m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;
const int udpsize = m_config.iMSS - CPacket::UDP_HDR_SIZE;
m_iMaxSRTPayloadSize = udpsize - CPacket::HDR_SIZE;
Expand Down Expand Up @@ -5536,7 +5552,7 @@ int srt::CUDT::rcvDropTooLateUpTo(int seqno)
enterCS(m_StatsLock);
// Estimate dropped bytes from average payload size.
const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize();
m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt));
m_stats.rcvr.dropped.count(stats::BytesPacketsCount(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt));
leaveCS(m_StatsLock);
}
return iDropCnt;
Expand All @@ -5560,7 +5576,7 @@ void srt::CUDT::setInitialRcvSeq(int32_t isn)
const int iDropCnt = m_pRcvBuffer->dropAll();
const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize();
sync::ScopedLock sl(m_StatsLock);
m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt));
m_stats.rcvr.dropped.count(stats::BytesPacketsCount(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt));
}

m_pRcvBuffer->setStartSeqNo(isn);
Expand Down Expand Up @@ -5621,6 +5637,13 @@ bool srt::CUDT::prepareBuffers(CUDTException* eout)
{
// CryptoControl has to be initialized and in case of RESPONDER the KM REQ must be processed (interpretSrtHandshake(..)) for the crypto mode to be deduced.
const int authtag = (m_pCryptoControl && m_pCryptoControl->getCryptoMode() == CSrtConfig::CIPHER_MODE_AES_GCM) ? HAICRYPT_AUTHTAG_MAX : 0;

SRT_ASSERT(m_iMaxSRTPayloadSize != 0);

HLOGC(rslog.Debug, log << CONID() << "Creating buffers: snd-plsize=" << m_iMaxSRTPayloadSize
<< " snd-bufsize=" << 32
<< " authtag=" << authtag);

m_pSndBuffer = new CSndBuffer(AF_INET, 32, m_iMaxSRTPayloadSize, authtag);
SRT_ASSERT(m_iPeerISN != -1);
m_pRcvBuffer = new srt::CRcvBuffer(m_iPeerISN, m_config.iRcvBufSize, m_pRcvQueue->m_pUnitQueue, m_config.bMessageAPI);
Expand Down Expand Up @@ -5669,6 +5692,12 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any&
// Uses the smaller MSS between the peers
m_config.iMSS = std::min(m_config.iMSS, w_hs.m_iMSS);

const size_t full_hdr_size = CPacket::UDP_HDR_SIZE + CPacket::HDR_SIZE;
m_iMaxSRTPayloadSize = m_config.iMSS - full_hdr_size;

HLOGC(cnlog.Debug, log << CONID() << "acceptAndRespond: PAYLOAD SIZE: " << m_iMaxSRTPayloadSize);
m_stats.setupHeaderSize(full_hdr_size);

// exchange info for maximum flow window size
m_iFlowWindowSize = w_hs.m_iFlightFlagSize;
m_iPeerISN = w_hs.m_iISN;
Expand All @@ -5690,9 +5719,6 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any&

rewriteHandshakeData(peer, (w_hs));

int udpsize = m_config.iMSS - CPacket::UDP_HDR_SIZE;
m_iMaxSRTPayloadSize = udpsize - CPacket::HDR_SIZE;
HLOGC(cnlog.Debug, log << CONID() << "acceptAndRespond: PAYLOAD SIZE: " << m_iMaxSRTPayloadSize);

// Prepare all structures
if (!prepareConnectionObjects(w_hs, HSD_DRAW, 0))
Expand Down Expand Up @@ -8928,7 +8954,7 @@ void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt)
enterCS(m_StatsLock);
// Estimate dropped bytes from average payload size.
const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize();
m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt));
m_stats.rcvr.dropped.count(stats::BytesPacketsCount(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt));
leaveCS(m_StatsLock);
}
}
Expand Down Expand Up @@ -10045,8 +10071,8 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&

const steady_clock::time_point tnow = steady_clock::now();
ScopedLock lg(m_StatsLock);
m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * rpkt.getLength(), iDropCnt));
m_stats.rcvr.undecrypted.count(stats::BytesPackets(rpkt.getLength(), 1));
m_stats.rcvr.dropped.count(stats::BytesPacketsCount(iDropCnt * rpkt.getLength(), iDropCnt));
m_stats.rcvr.undecrypted.count(stats::BytesPacketsCount(rpkt.getLength(), 1));
if (frequentLogAllowed(tnow))
{
LOGC(qrlog.Warn, log << CONID() << "Decryption failed (seqno %" << u->m_Packet.getSeqNo() << "), dropped "
Expand All @@ -10062,8 +10088,8 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&

const steady_clock::time_point tnow = steady_clock::now();
ScopedLock lg(m_StatsLock);
m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt* rpkt.getLength(), iDropCnt));
m_stats.rcvr.undecrypted.count(stats::BytesPackets(rpkt.getLength(), 1));
m_stats.rcvr.dropped.count(stats::BytesPacketsCount(iDropCnt* rpkt.getLength(), iDropCnt));
m_stats.rcvr.undecrypted.count(stats::BytesPacketsCount(rpkt.getLength(), 1));
if (frequentLogAllowed(tnow))
{
LOGC(qrlog.Warn, log << CONID() << "Packet not encrypted (seqno %" << u->m_Packet.getSeqNo() << "), dropped "
Expand Down Expand Up @@ -10263,7 +10289,7 @@ int srt::CUDT::processData(CUnit* in_unit)

ScopedLock lg(m_StatsLock);
const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize();
m_stats.rcvr.lost.count(stats::BytesPackets(loss * avgpayloadsz, (uint32_t) loss));
m_stats.rcvr.lost.count(stats::BytesPacketsCount(loss * avgpayloadsz, (uint32_t) loss));

HLOGC(qrlog.Debug,
log << CONID() << "LOSS STATS: n=" << loss << " SEQ: [" << CSeqNo::incseq(m_iRcvCurrPhySeqNo) << " "
Expand Down
7 changes: 7 additions & 0 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,13 @@ class CUDT

int64_t sndDuration; // real time for sending
time_point sndDurationCounter; // timers to record the sending Duration

void setupHeaderSize(int hsize)
{
sndr.setupHeaderSize(hsize);
rcvr.setupHeaderSize(hsize);
}

} m_stats;

public:
Expand Down
3 changes: 3 additions & 0 deletions srtcore/socketconfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ struct CSrtConfig: CSrtMuxerConfig

bool payloadSizeFits(size_t val, int ip_family, std::string& w_errmsg) ATR_NOTHROW;

// This function returns the number of bytes that are allocated
// for a single packet in the sender and receiver buffer.
int bytesPerPkt() const { return iMSS - int(CPacket::UDP_HDR_SIZE); }
};

template <typename T>
Expand Down
82 changes: 67 additions & 15 deletions srtcore/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace stats
class Packets
{
public:
typedef Packets count_type;

Packets() : m_count(0) {}

Packets(uint32_t num) : m_count(num) {}
Expand All @@ -46,27 +48,20 @@ class Packets
uint32_t m_count;
};

class BytesPackets
class BytesPacketsCount
{
public:
BytesPackets()
BytesPacketsCount()
: m_bytes(0)
, m_packets(0)
{}

BytesPackets(uint64_t bytes, uint32_t n = 1)
BytesPacketsCount(uint64_t bytes, uint32_t n = 1)
: m_bytes(bytes)
, m_packets(n)
{}

BytesPackets& operator+= (const BytesPackets& other)
{
m_bytes += other.m_bytes;
m_packets += other.m_packets;
return *this;
}

public:
void reset()
{
m_packets = 0;
Expand All @@ -89,28 +84,62 @@ class BytesPackets
return m_packets;
}

uint64_t bytesWithHdr() const
BytesPacketsCount& operator+= (const BytesPacketsCount& other)
{
return m_bytes + m_packets * CPacket::SRT_DATA_HDR_SIZE;
m_bytes += other.m_bytes;
m_packets += other.m_packets;
return *this;
}

private:
protected:
uint64_t m_bytes;
uint32_t m_packets;
};

template <class METRIC_TYPE>
class BytesPackets: public BytesPacketsCount
{
public:
typedef BytesPacketsCount count_type;

// Set IPv4-based header size value as a fallback. This will be fixed upon connection.
BytesPackets()
: m_zPacketHeaderSize(CPacket::UDP_HDR_SIZE + CPacket::HDR_SIZE)
{}

public:

void setupHeaderSize(int size)
{
m_zPacketHeaderSize = uint64_t(size);
}

uint64_t bytesWithHdr() const
{
return m_bytes + m_packets * m_zPacketHeaderSize;
}

private:
uint64_t m_zPacketHeaderSize;
};

template <class METRIC_TYPE, class BASE_METRIC_TYPE = METRIC_TYPE>
struct Metric
{
METRIC_TYPE trace;
METRIC_TYPE total;

void count(METRIC_TYPE val)
void count(typename METRIC_TYPE::count_type val)
{
trace += val;
total += val;
}

void setupHeaderSize(int loc)
{
trace.setupHeaderSize(loc);
total.setupHeaderSize(loc);
}

void reset()
{
trace.reset();
Expand All @@ -137,6 +166,16 @@ struct Sender
Metric<Packets> recvdAck; // The number of ACK packets received by the sender.
Metric<Packets> recvdNak; // The number of ACK packets received by the sender.

void setupHeaderSize(int hdr_size)
{
#define SETHSIZE(var) var.setupHeaderSize(hdr_size)
SETHSIZE(sent);
SETHSIZE(sentUnique);
SETHSIZE(sentRetrans);
SETHSIZE(dropped);
#undef SETHSIZE
}

void reset()
{
sent.reset();
Expand Down Expand Up @@ -180,6 +219,19 @@ struct Receiver
Metric<Packets> sentAck; // The number of ACK packets sent by the receiver.
Metric<Packets> sentNak; // The number of NACK packets sent by the receiver.

void setupHeaderSize(int hdr_size)
{
#define SETHSIZE(var) var.setupHeaderSize(hdr_size)
SETHSIZE(recvd);
SETHSIZE(recvdUnique);
SETHSIZE(recvdRetrans);
SETHSIZE(lost);
SETHSIZE(dropped);
SETHSIZE(recvdBelated);
SETHSIZE(undecrypted);
#undef SETHSIZE
}

void reset()
{
recvd.reset();
Expand Down
8 changes: 4 additions & 4 deletions srtcore/window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ int acknowledge(Seq* r_aSeq, const size_t size, int& r_iHead, int& r_iTail, int3

////////////////////////////////////////////////////////////////////////////////

void srt::CPktTimeWindowTools::initializeWindowArrays(int* r_pktWindow, int* r_probeWindow, int* r_bytesWindow, size_t asize, size_t psize)
void srt::CPktTimeWindowTools::initializeWindowArrays(int* r_pktWindow, int* r_probeWindow, int* r_bytesWindow, size_t asize, size_t psize, size_t max_payload_size)
{
for (size_t i = 0; i < asize; ++ i)
r_pktWindow[i] = 1000000; //1 sec -> 1 pkt/sec
Expand All @@ -154,11 +154,11 @@ void srt::CPktTimeWindowTools::initializeWindowArrays(int* r_pktWindow, int* r_p
r_probeWindow[k] = 1000; //1 msec -> 1000 pkts/sec

for (size_t i = 0; i < asize; ++ i)
r_bytesWindow[i] = srt::CPacket::SRT_MAX_PAYLOAD_SIZE; //based on 1 pkt/sec set in r_pktWindow[i]
r_bytesWindow[i] = max_payload_size; //based on 1 pkt/sec set in r_pktWindow[i]
}


int srt::CPktTimeWindowTools::getPktRcvSpeed_in(const int* window, int* replica, const int* abytes, size_t asize, int& bytesps)
int srt::CPktTimeWindowTools::getPktRcvSpeed_in(const int* window, int* replica, const int* abytes, size_t asize, size_t hdr_size, int& bytesps)
{
// get median value, but cannot change the original value order in the window
std::copy(window, window + asize, replica);
Expand Down Expand Up @@ -191,7 +191,7 @@ int srt::CPktTimeWindowTools::getPktRcvSpeed_in(const int* window, int* replica,
// claculate speed, or return 0 if not enough valid value
if (count > (asize >> 1))
{
bytes += (srt::CPacket::SRT_DATA_HDR_SIZE * count); //Add protocol headers to bytes received
bytes += (hdr_size * count); //Add protocol headers to bytes received
bytesps = (int)ceil(1000000.0 / (double(sum) / double(bytes)));
return (int)ceil(1000000.0 / (sum / count));
}
Expand Down
Loading

0 comments on commit 66e7468

Please sign in to comment.