From 4ed8aadff61b61537db62910d642d63740d7bb5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Fri, 24 Apr 2020 11:52:47 +0200 Subject: [PATCH] Added balancing group implementation. --- srtcore/api.cpp | 5 + srtcore/core.cpp | 1509 +++++++++++++++++++++++++++++++++++++++++++++- srtcore/core.h | 66 +- 3 files changed, 1553 insertions(+), 27 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 5c68bca23..9cf27e982 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1294,6 +1294,11 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPDATA* targets, int arra int isn = g.currentSchedSequence(); + // Don't synchronize ISN in case of balancing groups. Every link + // may send their own payloads independently. + if (g.type() == SRT_GTYPE_BALANCING) + isn = -1; + // We got it. Bind the socket, if the source address was set if (!source_addr.empty()) bind(ns, source_addr); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 58e464fd3..9a4ce0b61 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -207,6 +207,9 @@ void CUDT::construct() m_uKmRefreshRatePkt = 0; m_uKmPreAnnouncePkt = 0; + m_iSndMinFlightSpan = -1; // -1 value means "not measured". Normally all current values of -1 are rejected. + // (note that flight == 0 is still a valid value) + // Initilize mutex and condition variables initSynch(); @@ -3602,23 +3605,30 @@ void CUDT::synchronizeWithGroup(CUDTGroup* gp) // with updateAfterSrtHandshake(). updateSrtSndSettings(); - // These are the values that are normally set initially by setters. - int32_t snd_isn = m_iSndLastAck, rcv_isn = m_iRcvLastAck; - if (!gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn))) + if (gp->type() == SRT_GTYPE_BALANCING) { - HLOGC(mglog.Debug, log << "synchronizeWithGroup: @" << m_SocketID - << " DERIVED ISN: RCV=%" << m_iRcvLastAck << " -> %" << rcv_isn - << " (shift by " << CSeqNo::seqcmp(rcv_isn, m_iRcvLastAck) - << ") SND=%" << m_iSndLastAck << " -> %" << snd_isn - << " (shift by " << CSeqNo::seqcmp(snd_isn, m_iSndLastAck) << ")"); - setInitialRcvSeq(rcv_isn); - setInitialSndSeq(snd_isn); + HLOGC(mglog.Debug, log << "synchronizeWithGroup: @" << m_SocketID << ": NOT synchronizing sequence numbers."); } else { - HLOGC(mglog.Debug, log << "synchronizeWithGroup: @" << m_SocketID - << " DEFINED ISN: RCV=%" << m_iRcvLastAck - << " SND=%" << m_iSndLastAck); + // These are the values that are normally set initially by setters. + int32_t snd_isn = m_iSndLastAck, rcv_isn = m_iRcvLastAck; + if (!gp->applyGroupSequences(m_SocketID, (snd_isn), (rcv_isn))) + { + HLOGC(mglog.Debug, log << "synchronizeWithGroup: @" << m_SocketID + << " DERIVED ISN: RCV=%" << m_iRcvLastAck << " -> %" << rcv_isn + << " (shift by " << CSeqNo::seqcmp(rcv_isn, m_iRcvLastAck) + << ") SND=%" << m_iSndLastAck << " -> %" << snd_isn + << " (shift by " << CSeqNo::seqcmp(snd_isn, m_iSndLastAck) << ")"); + setInitialRcvSeq(rcv_isn); + setInitialSndSeq(snd_isn); + } + else + { + HLOGC(mglog.Debug, log << "synchronizeWithGroup: @" << m_SocketID + << " DEFINED ISN: RCV=%" << m_iRcvLastAck + << " SND=%" << m_iSndLastAck); + } } } @@ -6462,6 +6472,15 @@ int CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl) return 0; } + if (w_mctrl.msgno != -1) // most unlikely, unless you use balancing groups + { + if (w_mctrl.msgno < 1 || w_mctrl.msgno > MSGNO_SEQ_MAX) + { + LOGC(dlog.Error, log << "INVALID forced msgno " << w_mctrl.msgno << ": can be -1 (trap) or <1..." << MSGNO_SEQ_MAX << ">"); + throw CUDTException(MJ_NOTSUP, MN_INVAL); + } + } + int msttl = w_mctrl.msgttl; bool inorder = w_mctrl.inorder; @@ -8085,6 +8104,7 @@ void CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_point CGuard ack_lock(m_RecvAckLock); m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ackdata_seqno); m_iSndLastAck = ackdata_seqno; + m_iSndMinFlightSpan = getFlightSpan(); // TODO: m_tsLastRspAckTime should be protected with m_RecvAckLock // because the sendmsg2 may want to change it at the same time. @@ -8137,6 +8157,7 @@ void CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_point // Update Flow Window Size, must update before and together with m_iSndLastAck m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT]; m_iSndLastAck = ackdata_seqno; + m_iSndMinFlightSpan = getFlightSpan(); m_tsLastRspAckTime = currtime; m_iReXmitCount = 1; // Reset re-transmit count since last ACK } @@ -10737,7 +10758,7 @@ void CUDT::checkRexmitTimer(const steady_clock::time_point& currtime) if ((is_laterexmit && unsent_seqno != m_iSndLastAck && m_pSndLossList->getLossLength() == 0) // OR: // - FASTREXMIT - // - flight window > 0 + // - flight window > 0 (getFlightSpan never returns negative values) || (is_fastrexmit && getFlightSpan() != 0)) { retransmit = true; @@ -11122,6 +11143,8 @@ CUDTGroup::SocketData CUDTGroup::prepareData(CUDTSocket* s) -1, -1, sockaddr_any(), sockaddr_any(), false, false, false, + 0.0, // load factor: no load in the beginning + 1.0, // unit load: how much one packet would increase the load 0 // weight }; return sd; @@ -11151,11 +11174,14 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_tsStartTime() , m_tsRcvPeerStartTime() , m_RcvBaseSeqNo(SRT_SEQNO_NONE) + , m_RcvBaseMsgNo(-1) , m_bOpened(false) , m_bConnected(false) , m_bClosing(false) , m_iLastSchedSeqNo(SRT_SEQNO_NONE) , m_iLastSchedMsgNo(SRT_MSGNO_NONE) + , m_uBalancingRoll(0) + , m_RandomCredit(16) { setupMutex(m_GroupLock, "Group"); setupMutex(m_RcvDataLock, "RcvData"); @@ -11167,18 +11193,23 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) switch (gtype) { case SRT_GTYPE_BROADCAST: + //m_bSynchOnMsgNo = false; m_selfManaged = true; break; case SRT_GTYPE_BACKUP: + //m_bSynchOnMsgNo = false; m_selfManaged = true; break; case SRT_GTYPE_BALANCING: + //m_bSynchOnMsgNo = true; m_selfManaged = true; + m_cbSelectLink.set(this, &CUDTGroup::linkSelect_window_fw); break; case SRT_GTYPE_MULTICAST: + //m_bSynchOnMsgNo = false; m_selfManaged = false; break; @@ -11721,11 +11752,10 @@ int CUDTGroup::send(const char* buf, int len, SRT_MSGCTRL& w_mc) case SRT_GTYPE_BACKUP: return sendBackup(buf, len, (w_mc)); - /* to be implemented - case SRT_GTYPE_BALANCING: return sendBalancing(buf, len, (w_mc)); + /* to be implemented case SRT_GTYPE_MULTICAST: return sendMulticast(buf, len, (w_mc)); */ @@ -12512,6 +12542,11 @@ void CUDTGroup::updateWriteState() // by getting the sequence number. int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) { + // Balancing is a special case when packets are sorted out basing on the + // message number. + if (m_type == SRT_GTYPE_BALANCING) + return recvBalancing(buf, len, (w_mc)); + typedef map::iterator pit_t; // Later iteration over it might be less efficient than // by vector, but we'll also often try to check a single id @@ -12823,7 +12858,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) { // The position is not known, so get the position on which // the socket is currently standing. - pair ee = m_Positions.insert(make_pair(id, ReadPos(ps->core().m_iRcvLastSkipAck))); + pair ee = m_Positions.insert(make_pair(id, ReadPos(ps->core().m_iRcvLastSkipAck, type()))); p = &(ee.first->second); HLOGC(dlog.Debug, log << "group/recv: EPOLL: @" << id << " %" << p->mctrl.pktseq << " NEW SOCKET INSERTED"); } @@ -13156,6 +13191,42 @@ CUDTGroup::ReadPos* CUDTGroup::checkPacketAhead() return out; } +CUDTGroup::ReadPos* CUDTGroup::checkPacketAheadMsgno() +{ + typedef map::iterator pit_t; + ReadPos* out = 0; + + // This map no longer maps only ahead links. + // Here are all links, and whether ahead, it's defined by the sequence. + for (pit_t i = m_Positions.begin(); i != m_Positions.end(); ++i) + { + // i->first: socket ID + // i->second: ReadPos { sequence, packet } + // We are not interested with the socket ID because we + // aren't going to read from it - we have the packet already. + ReadPos& a = i->second; + + int ndiff = MsgNo(a.mctrl.msgno) - MsgNo(m_RcvBaseMsgNo); + + if (ndiff == 1) + { + // The very next packet. Return it. + HLOGC(dlog.Debug, log << "group/recvBalancing/checkPacketAhead: Base: #" << m_RcvBaseMsgNo << " ahead delivery POSSIBLE #" + << a.mctrl.msgno << " from @" << i->first); + out = &a; + } + else if (ndiff < 1 && !a.packet.empty()) + { + HLOGC(dlog.Debug, log << "group/recvBalancing/checkPacketAhead: @" << i->first << " dropping collected ahead #" + << a.mctrl.msgno << " with base #" << m_RcvBaseMsgNo); + a.packet.clear(); + } + // In case when it's >1, keep it in ahead + } + + return out; +} + const char* CUDTGroup::StateStr(CUDTGroup::GroupState st) { static const char* const states [] = { "PENDING", "IDLE", "RUNNING", "BROKEN" }; @@ -14460,18 +14531,18 @@ int CUDTGroup::configure(const char* str) string config = str; switch (type()) { - /* TMP review stub case SRT_GTYPE_BALANCING: + case SRT_GTYPE_BALANCING: // config contains the algorithm name - if (config == "" || config == "plain") - { - m_cbSelectLink.set(this, &CUDTGroup::linkSelect_plain_fw); - HLOGC(mglog.Debug, log << "group(balancing): PLAIN algorithm selected"); - } - else if (config == "window") + if (config == "" || config == "window") { m_cbSelectLink.set(this, &CUDTGroup::linkSelect_window_fw); HLOGC(mglog.Debug, log << "group(balancing): WINDOW algorithm selected"); } + else if (config == "fixed") + { + m_cbSelectLink.set(this, &CUDTGroup::linkSelect_fixed_fw); + HLOGC(mglog.Debug, log << "group(balancing): FIXED algorithm selected"); + } else { LOGC(mglog.Error, log << "group(balancing): unknown selection algorithm '" @@ -14479,7 +14550,7 @@ int CUDTGroup::configure(const char* str) return CUDT::APIError(MJ_NOTSUP, MN_INVAL, 0); } - break;*/ + break; default: if (config == "") @@ -14494,3 +14565,1391 @@ int CUDTGroup::configure(const char* str) return 0; } + +int CUDTGroup::baseOffset(SRT_MSGCTRL& mctrl) +{ + return CSeqNo::seqoff(m_RcvBaseSeqNo, mctrl.pktseq); +} + +int CUDTGroup::baseOffset(CUDTGroup::ReadPos& pos) +{ + return CSeqNo::seqoff(m_RcvBaseSeqNo, pos.mctrl.pktseq); +} + +bool CUDTGroup::seqDiscrepancy(SRT_MSGCTRL& mctrl) +{ + // Here it's predicted that the incoming packet's seq may only be + // at most 1/8 of the whole range in advance. + + // seqoff returns the distance between two sequences, + // which is the less of the two possible distances that + // one can be reached from another. The precondition + // is here that this difference, in whichever direction, + // shall not be greater than 1/4 of the whole wheel. + return abs(CSeqNo::seqoff(m_RcvBaseSeqNo, mctrl.pktseq)) > CSeqNo::m_iSeqNoTH/2; +} + +bool CUDTGroup::msgDiscrepancy(SRT_MSGCTRL& mctrl) +{ + // Here it's predicted that the incoming packet's seq may only be + // at most 1/8 of the whole range in advance. + + // seqoff returns the distance between two sequences, + // which is the less of the two possible distances that + // one can be reached from another. The precondition + // is here that this difference, in whichever direction, + // shall not be greater than 1/4 of the whole wheel. + return abs(MsgNo(mctrl.msgno) - MsgNo(m_RcvBaseMsgNo)) > int32_t(MsgNo::HALF/2); +} + + +int CUDTGroup::recvBalancing(char* buf, int len, SRT_MSGCTRL& w_mctrl) +{ + typedef map::iterator pit_t; + // Later iteration over it might be less efficient than + // by vector, but we'll also often try to check a single id + // if it was ever seen broken, so that it's skipped. + set broken; + + // Remember them now because they will be overwritten. + SRT_SOCKGROUPDATA* out_grpdata = (w_mctrl.grpdata); + size_t out_grpdata_size = w_mctrl.grpdata_size; + + size_t output_size = 0; + + for (;;) + { + if (!m_bOpened || !m_bConnected) + { + LOGC(dlog.Error, log << boolalpha << "grp/recvBalancing: ERROR opened=" << m_bOpened << " connected=" << m_bConnected); + throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); + } + + HLOGC(dlog.Debug, log << "grp/recvBalancing: START. Buffer size=" << len); + + // Check first the ahead packets if you have any to deliver. + if (m_RcvBaseMsgNo != -1 && !m_Positions.empty()) + { + // This function also updates the group sequence pointer. + ReadPos* pos = checkPacketAheadMsgno(); + if (pos) + { + if (size_t(len) < pos->packet.size()) + throw CUDTException(MJ_NOTSUP, MN_XSIZE, 0); + + HLOGC(dlog.Debug, log << "grp/recvBalancing: delivering AHEAD packet %" << pos->mctrl.pktseq << " #" << pos->mctrl.msgno + << ": " << BufferStamp(&pos->packet[0], pos->packet.size()) << " - updating base"); + memcpy(buf, &pos->packet[0], pos->packet.size()); + fillGroupData((w_mctrl), pos->mctrl, (out_grpdata), out_grpdata_size); + len = pos->packet.size(); + pos->packet.clear(); + m_RcvBaseMsgNo = pos->mctrl.msgno; + + // We predict to have only one packet ahead, others are pending to be reported by tsbpd. + // This will be "re-enabled" if the later check puts any new packet into ahead. + m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false); + + return len; + } + } + + // LINK QUALIFICATION NAMES: + // + // HORSE: Correct link, which delivers the very next sequence. + // Not necessarily this link is currently active. + // + // KANGAROO: Got some packets dropped and the sequence number + // of the packet jumps over the very next sequence and delivers + // an ahead packet. + // + // ELEPHANT: Is not ready to read, while others are, or reading + // up to the current latest delivery sequence number does not + // reach this sequence and the link becomes non-readable earlier. + + // The above condition has ruled out one kangaroo and turned it + // into a horse. + + // Below there's a loop that will try to extract packets. Kangaroos + // will be among the polled ones because skipping them risks that + // the elephants will take over the reading. Links already known as + // elephants will be also polled in an attempt to revitalize the + // connection that experienced just a short living choking. + // + // After polling we attempt to read from every link that reported + // read-readiness and read at most up to the sequence equal to the + // current delivery sequence. + + // Links that deliver a packet below that sequence will be retried + // until they deliver no more packets or deliver the packet of + // expected sequence. Links that don't have a record in m_Positions + // and report readiness will be always read, at least to know what + // sequence they currently stand on. + // + // Links that are already known as kangaroos will be polled, but + // no reading attempt will be done. If after the reading series + // it will turn out that we have no more horses, the slowest kangaroo + // will be "upgraded to a horse" (the ahead link with a sequence + // closest to the current delivery sequence will get its sequence + // set as current delivered and its recorded ahead packet returned + // as the read packet). + + // If we find at least one horse, the packet read from that link + // will be delivered. All other link will be just ensured update + // up to this sequence number, or at worst all available packets + // will be read. In this case all kangaroos remain kangaroos, + // until the current delivery sequence m_RcvBaseMsgNo will be lifted + // to the sequence recorded for these links in m_Positions, + // during the next time ahead check, after which they will become + // horses. + +#if ENABLE_HEAVY_LOGGING + std::ostringstream ds; + ds << "E(" << m_RcvEID << ") "; +#define HCLOG(expr) expr +#else +#define HCLOG(x) if (false) {} +#endif + + bool still_alive = false; + size_t size = 0; + + // You can't lock the whole group for that + // action because this will result in a deadlock. + // Prepare first the list of sockets to be added as connect-pending + // and as read-ready, then unlock the group, and then add them to epoll. + vector read_ready, connect_pending; + + { + HLOGC(dlog.Debug, log << "grp/recvBalancing: Reviewing member sockets to epoll-add (locking)"); + CGuard glock (m_GroupLock); + for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi) + { + ++size; // list::size loops over all elements anyway, so this hits two birds with one stone + if (gi->laststatus == SRTS_CONNECTING) + { + HCLOG(ds << "@" << gi->id << " "); + /* + connect_pending.push_back(gi->id); + */ + + continue; // don't read over a failed or pending socket + } + + if (gi->laststatus >= SRTS_BROKEN) + { + broken.insert(gi->ps); + } + + if (broken.count(gi->ps)) + { + HCLOG(ds << "@" << gi->id << " "); + continue; + } + + if (gi->laststatus != SRTS_CONNECTED) + { + HCLOG(ds << "@" << gi->id << "laststatus) << "> "); + // Sockets in this state are ignored. We are waiting until it + // achieves CONNECTING state, then it's added to write. + // Or gets broken and closed in the next step. + continue; + } + + still_alive = true; + + // Don't skip packets that are ahead because if we have a situation + // that all links are either "elephants" (do not report read readiness) + // and "kangaroos" (have already delivered an ahead packet) then + // omiting kangaroos will result in only elephants to be polled for + // reading. Due to the strict timing requirements and ensurance that + // TSBPD on every link will result in exactly the same delivery time + // for a packet of given sequence, having an elephant and kangaroo in + // one cage means that the elephant is simply a broken or half-broken + // link (the data are not delivered, but it will get repaired soon, + // enough for SRT to maintain the connection, but it will still drop + // packets that didn't arrive in time), in both cases it may + // potentially block the reading for an indefinite time, while + // simultaneously a kangaroo might be a link that got some packets + // dropped, but then it's still capable to deliver packets on time. + + // Note that gi->id might be a socket that was previously being polled + // on write, when it's attempting to connect, but now it's connected. + // This will update the socket with the new event set. + + read_ready.push_back(gi->id); + HCLOG(ds << "@" << gi->id << "[READ] "); + } + } + + int read_modes = SRT_EPOLL_IN | SRT_EPOLL_ERR; + + /* Done at the connecting stage so that it won't be missed. + + int connect_modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR; + for (vector::iterator i = connect_pending.begin(); i != connect_pending.end(); ++i) + { + srt_epoll_add_usock(m_RcvEID, *i, &connect_modes); + } + + AND this below additionally for sockets that were so far pending connection, + will be now "upgraded" to readable sockets. The epoll adding function for a + socket that already is in the eid container will only change the poll flags, + but will not re-add it, that is, event reports that are both in old and new + flags will survive the operation. + + */ + + for (vector::iterator i = read_ready.begin(); i != read_ready.end(); ++i) + { + srt_epoll_add_usock(m_RcvEID, *i, &read_modes); + } + + HLOGC(dlog.Debug, log << "grp/recvBalancing: " << ds.str() << " --> EPOLL/SWAIT"); +#undef HCLOG + + if (!still_alive) + { + LOGC(dlog.Error, log << "grp/recvBalancing: all links broken"); + throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); + } + + // Here we need to make an additional check. + // There might be a possibility that all sockets that + // were added to the reader group, are ahead. At least + // surely we don't have a situation that any link contains + // an ahead-read subsequent packet, because GroupCheckPacketAhead + // already handled that case. + // + // What we can have is that every link has: + // - no known seq position yet (is not registered in the position map yet) + // - the position equal to the latest delivered sequence + // - the ahead position + + // Now the situation is that we don't have any packets + // waiting for delivery so we need to wait for any to report one. + + // XXX We support blocking mode only at the moment. + // The non-blocking mode would need to simply check the readiness + // with only immediate report, and read-readiness would have to + // be done in background. + + // Poll on this descriptor until reading is available, indefinitely. + CEPoll::fmap_t sready; + + // In blocking mode, use m_iRcvTimeOut, which's default value -1 + // means to block indefinitely, also in swait(). + // In non-blocking mode use 0, which means to always return immediately. + int timeout = m_bSynRecving ? m_iRcvTimeOut : 0; + int nready = m_pGlobal->m_EPoll.swait(*m_RcvEpolld, sready, timeout, false /*report by retval*/); + + HLOGC(dlog.Debug, log << "grp/recvBalancing: RDY: " << DisplayEpollResults(sready)); + + if (nready == 0) + { + // This can only happen when 0 is passed as timeout and none is ready. + // And 0 is passed only in non-blocking mode. So this is none ready in + // non-blocking mode. + throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0); + } + + // Handle sockets of pending connection and with errors. + + // Nice to have something like: + + // broken = FilterIf(sready, [] (auto s) + // { return s.second == SRT_EPOLL_ERR && (auto cs = g->locateSocket(s.first, ERH_RETURN)) + // ? {cs, true} + // : {nullptr, false} + // }); + + FilterIf( + /*FROM*/ sready.begin(), sready.end(), + /*TO*/ std::inserter(broken, broken.begin()), + /*VIA*/ FLookupSocketWithEvent(m_pGlobal, SRT_EPOLL_ERR)); + + // Ok, now we need to have some extra qualifications: + // 1. If a socket has no registry yet, we read anyway, just + // to notify the current position. We read ONLY ONE PACKET this time, + // we'll worry later about adjusting it to the current group sequence + // position. + // 2. If a socket is already position ahead, DO NOT read from it, even + // if it is ready. + + // The state of things whether we were able to extract the very next + // sequence will be simply defined by the fact that `output` is nonempty. + + int32_t next_msgno = m_RcvBaseMsgNo; + + // If this set is empty, it won't roll even once, therefore output + // will be surely empty. This will be checked then same way as when + // reading from every socket resulted in error. + + HLOGC(dlog.Debug, log << "grp/recvBalancing: Reviewing read-ready sockets: " << DisplayEpollResults(sready)); + + for (CEPoll::fmap_t::const_iterator i = sready.begin(); i != sready.end(); ++i) + { + if (i->second & SRT_EPOLL_ERR) + continue; // broken already + + if ((i->second & SRT_EPOLL_IN) == 0) + continue; // not ready for reading + + // Check if this socket is in aheads + // If so, don't read from it, wait until the ahead is flushed. + SRTSOCKET id = i->first; + CUDTSocket* ps = m_pGlobal->locateSocket(id); // exception would interrupt it (SANITY) + ReadPos* p = NULL; + pit_t pe = m_Positions.find(id); + if (pe != m_Positions.end()) + { + p = &pe->second; + + // Possible results of comparison: + // x < 0: the sequence is in the past, the socket should be adjusted FIRST + // x = 0: the socket should be ready to get the exactly next packet + // x = 1: the case is already handled by GroupCheckPacketAhead. + // x > 1: AHEAD. DO NOT READ. + int ndiff = MsgNo(p->mctrl.msgno) - MsgNo(m_RcvBaseMsgNo); + if (ndiff > 1) + { + HLOGC(dlog.Debug, log << "grp/recvBalancing: EPOLL: @" << id << " #" << p->mctrl.msgno + << " AHEAD #" << m_RcvBaseMsgNo << ", not reading."); + continue; + } + } + else + { + // The position is not known, so get the position on which + // the socket is currently standing. Could be unknown, too. + pair ee = m_Positions.insert(make_pair(id, ReadPos(ps->core().m_pRcvBuffer->getTopMsgno(), SRT_GTYPE_BALANCING))); + p = &(ee.first->second); + HLOGC(dlog.Debug, log << "grp/recvBalancing: EPOLL: @" << id << " #" << p->mctrl.msgno << " NEW SOCKET INSERTED"); + } + + // Read from this socket stubbornly, until: + // - reading is no longer possible (AGAIN) + // - the sequence difference is >= 1 + + char lostbuf[SRT_LIVE_MAX_PLSIZE]; + for (;;) + { + SRT_MSGCTRL mctrl = srt_msgctrl_default; + char* used_output = 0; + + // Read the data into the user's buffer. This is an optimistic + // prediction that we'll read the right data. This will be overwritten + // by "more correct data" if found more appropriate later. But we have to + // copy these data anyway anywhere, even if they need to fall on the floor later. + int stat; + if (output_size) + { + // We have already the data, so this must fall on the floor + stat = ps->core().receiveMessage((lostbuf), SRT_LIVE_MAX_PLSIZE, (mctrl), CUDTUnited::ERH_RETURN); + HLOGC(dlog.Debug, log << "grp/recvBalancing: @" << id << " IGNORED data with %" << mctrl.pktseq << " #" << mctrl.msgno + << ": " << (stat <= 0 ? "(NOTHING)" : BufferStamp(lostbuf, stat))); + used_output = lostbuf; + } + else + { + stat = ps->core().receiveMessage((buf), len, (mctrl), CUDTUnited::ERH_RETURN); + HLOGC(dlog.Debug, log << "grp/recvBalancing: @" << id << " EXTRACTED data with %" << mctrl.pktseq << " #" << mctrl.msgno + << ": " << (stat <= 0 ? "(NOTHING)" : BufferStamp(buf, stat))); + used_output = buf; + } + if (stat == 0) + { + HLOGC(dlog.Debug, log << "grp/recvBalancing: SPURIOUS epoll, ignoring"); + // This is returned in case of "again". In case of errors, we have SRT_ERROR. + // Do not treat this as spurious, just stop reading. + break; + } + + if (stat == SRT_ERROR) + { + HLOGC(dlog.Debug, log << "grp/recvBalancing: @" << id << ": " << srt_getlasterror_str()); + broken.insert(ps); + break; + } + + // NOTE: checks against m_RcvBaseMsgNo and decisions based on it + // must NOT be done if m_RcvBaseMsgNo is -1, which means that we + // are about to deliver the very first packet and we take its + // sequence number as a good deal. + + // The order must be: + // - check discrepancy + // - record the sequence + // - check ordering. + // The second one must be done always, but failed discrepancy + // check should exclude the socket from any further checks. + // That's why the common check for m_RcvBaseSeqNo != -1 can't + // embrace everything below. + + // We need to first qualify the sequence, just for a case + if (m_RcvBaseSeqNo != -1 && msgDiscrepancy(mctrl)) + { + // This error should be returned if the link turns out + // to be the only one, or set to the group data. + // err = SRT_ESECFAIL; + LOGC(dlog.Error, log << "grp/recvBalancing: @" << id << ": SEQUENCE DISCREPANCY: base=%" + << m_RcvBaseSeqNo << " vs pkt=%" << mctrl.pktseq << ", setting ESECFAIL"); + broken.insert(ps); + break; + } + + // Rewrite it to the state for a case when next reading + // would not succeed. Do not insert the buffer here because + // this is only required when the sequence is ahead; for that + // it will be fixed later. + p->mctrl.msgno = mctrl.msgno; + + if (m_RcvBaseMsgNo != -1) + { + // Now we can safely check it. + int ndiff = MsgNo(mctrl.msgno) - MsgNo(m_RcvBaseMsgNo); + if (ndiff <= 0) + { + HLOGC(dlog.Debug, log << "grp/recvBalancing: @" << id << " #" << mctrl.msgno + << " BEHIND base=#" << m_RcvBaseMsgNo << " - discarding"); + // The sequence is recorded, the packet has to be discarded. + // That's all. + continue; + } + + // Now we have only two possibilities: + // ndiff == 1: The very next message, we want to read and return the packet. + // ndiff > 1: The packet is ahead - record the ahead packet, but continue with the others. + + if (ndiff > 1) + { + HLOGC(dlog.Debug, log << "@" << id << " #" << mctrl.msgno + << " AHEAD by " << ndiff << " for base=#" << m_RcvBaseMsgNo << " - recorded !" + << BufferStamp(used_output, stat) << " size=" << stat); + p->packet.assign(used_output, used_output+stat); + p->mctrl = mctrl; + break; // Don't read from that socket anymore. + } + } + else + { + HLOGC(dlog.Debug, log << "grp/recvBalancing: base not yet set; predicted #" << mctrl.msgno); + } + + // We have seqdiff = 1, or we simply have the very first packet + // which's sequence is taken as a good deal. Update the sequence + // and record output. + + if (output_size) + { + HLOGC(dlog.Debug, log << "grp/recvBalancing: @" << id << " #" << mctrl.msgno << " REDUNDANT"); + break; + } + + HLOGC(dlog.Debug, log << "grp/recvBalancing: @" << id << " #" << mctrl.msgno + << " DELIVERING; next msgno will be #" << mctrl.msgno); + output_size = stat; + fillGroupData((w_mctrl), mctrl, (out_grpdata), out_grpdata_size); + + // Record, but do not update yet, until all sockets are handled. + next_msgno = mctrl.msgno; + break; + } + } + +#if ENABLE_HEAVY_LOGGING + if (!broken.empty()) + { + std::ostringstream brks; + for (set::iterator b = broken.begin(); b != broken.end(); ++b) + brks << "@" << (*b)->m_SocketID << " "; + LOGC(dlog.Debug, log << "grp/recvBalancing: REMOVING BROKEN: " << brks.str()); + } +#endif + + // Now remove all broken sockets from aheads, if any. + // Even if they have already delivered a packet. + for (set::iterator di = broken.begin(); di != broken.end(); ++di) + { + CUDTSocket* ps = *di; + m_Positions.erase(ps->m_SocketID); + m_pGlobal->close(ps); + } + + if (broken.size() >= size) // This > is for sanity check + { + // All broken + HLOGC(dlog.Debug, log << "grp/recvBalancing: All sockets broken"); + m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true); + + throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + } + + + // May be required to be re-read. + broken.clear(); + + if (output_size) + { + // We have extracted something, meaning that we have the sequence shift. + // Update it now and don't do anything else with the sockets. + + // Sanity check + if (next_msgno == -1) + { + LOGP(dlog.Error, "IPE: next_msgno not set after output extracted!"); + + // This should never happen, but the only way to keep the code + // safe an recoverable is to use the incremented sequence. By + // leaving the sequence as is there's a risk of hangup. + m_RcvBaseMsgNo = ++MsgNo(m_RcvBaseMsgNo); + } + else + { + HLOGC(dlog.Debug, log << "grp/recvBalancing: new base #" << next_msgno << " - replacing previous #" << m_RcvBaseMsgNo); + m_RcvBaseMsgNo = next_msgno; + } + + ReadPos* pos = checkPacketAheadMsgno(); + if (!pos) + { + // Don't clear the read-readinsess state if you have a packet ahead because + // if you have, the next read call will return it. + m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false); + } + + HLOGC(dlog.Debug, log << "grp/recvBalancing: successfully extacted packet size=" << output_size << " - returning"); + return output_size; + } + + HLOGC(dlog.Debug, log << "grp/recvBalancing: NOT extracted anything - checking for a need to kick kangaroos"); + + // Check if we have any sockets left :D + + // Here we surely don't have any more HORSES, + // only ELEPHANTS and KANGAROOS. Qualify them and + // attempt to at least take advantage of KANGAROOS. + + // In this position all links are either: + // - updated to the current position + // - updated to the newest possible possition available + // - not yet ready for extraction (not present in the group) + + // If we haven't extracted the very next sequence position, + // it means that we might only have the ahead packets read, + // that is, the next sequence has been dropped by all links. + + if (!m_Positions.empty()) + { + // This might notify both lingering links, which didn't + // deliver the required sequence yet, and links that have + // the sequence ahead. Review them, and if you find at + // least one packet behind, just wait for it to be ready. + // Use again the waiting function because we don't want + // the general waiting procedure to skip others. + set elephants; + + // const because it's `typename decltype(m_Positions)::value_type` + pair* slowest_kangaroo = 0; + + for (pit_t rp = m_Positions.begin(); rp != m_Positions.end(); ++rp) + { + // NOTE that m_RcvBaseMsgNo in this place wasn't updated + // because we haven't successfully extracted anything. + int ncmp = MsgNo(rp->second.mctrl.msgno) - MsgNo(m_RcvBaseMsgNo); + if (ncmp < 0) + { + elephants.insert(rp->first); + } + // If ncmp == 0, we have a socket ON TRACK. + else if (ncmp > 0) + { + // If there's already a slowest_kangaroo, ncmp decides if this one is slower. + // Otherwise it is always slower by having no competition. + if (!slowest_kangaroo + || MsgNo(slowest_kangaroo->second.mctrl.msgno) > MsgNo(rp->second.mctrl.msgno)) + { + slowest_kangaroo = &*rp; + } + } + } + + // Note that if no "slowest_kangaroo" was found, it means + // that we don't have kangaroos. + if (slowest_kangaroo) + { + // We have a slowest kangaroo. Elephants must be ignored. + // Best case, they will get revived, worst case they will be + // soon broken. + // + // As we already have the packet delivered by the slowest + // kangaroo, we can simply return it. + + m_RcvBaseMsgNo = slowest_kangaroo->second.mctrl.msgno; + vector& pkt = slowest_kangaroo->second.packet; + if (size_t(len) < pkt.size()) + throw CUDTException(MJ_NOTSUP, MN_XSIZE, 0); + + HLOGC(dlog.Debug, log << "@" << slowest_kangaroo->first << " KANGAROO->HORSE #" + << slowest_kangaroo->second.mctrl.msgno + << ": " << BufferStamp(&pkt[0], pkt.size())); + + memcpy(buf, &pkt[0], pkt.size()); + fillGroupData((w_mctrl), slowest_kangaroo->second.mctrl, (out_grpdata), out_grpdata_size); + len = pkt.size(); + pkt.clear(); + + // It is unlikely to have a packet ahead because usually having one packet jumped-ahead + // clears the possibility of having aheads at all. + // XXX Research if this is possible at all; if it isn't, then don't waste time on + // looking for it. + ReadPos* pos = checkPacketAheadMsgno(); + if (!pos) + { + // Don't clear the read-readinsess state if you have a packet ahead because + // if you have, the next read call will return it. + m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false); + } + return len; + } + + HLOGC(dlog.Debug, log << "grp/recvBalancing: " + << (elephants.empty() ? "NO LINKS REPORTED ANY FRESHER PACKET." : "ALL LINKS ELEPHANTS.") + << " Re-polling."); + } + else + { + HLOGC(dlog.Debug, log << "grp/recvBalancing: POSITIONS EMPTY - Re-polling."); + } + } +} + +int CUDTGroup::sendBalancing(const char* buf, int len, SRT_MSGCTRL& w_mc) +{ + // Avoid stupid errors in the beginning. + if (len <= 0) + { + throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); + } + + // NOTE: This is a "vector of list iterators". Every element here + // is an iterator to another container. + // Note that "list" is THE ONLY container in standard C++ library, + // for which NO ITERATORS ARE INVALIDATED after a node at particular + // iterator has been removed, except for that iterator itself. + vector wipeme; + vector pending; + + w_mc.msgno = -1; + + CGuard guard (m_GroupLock); + + // Always set the same exactly message number for the payload + // sent over all links.Regardless whether it will be used to synchronize + // the streams or not. + if (m_iLastSchedMsgNo != -1) + { + HLOGC(dlog.Debug, log << "grp/sendBalancing: setting message number: " << m_iLastSchedMsgNo); + w_mc.msgno = m_iLastSchedMsgNo; + } + else + { + HLOGP(dlog.Debug, "grp/sendBalancing: NOT setting message number - waiting for the first successful sending"); + } + + + // Overview loop + for (gli_t d = m_Group.begin(); d != m_Group.end(); ++d) + { + d->sndresult = 0; // set as default + + // Check socket sndstate before sending + if (d->sndstate == GST_BROKEN) + { + HLOGC(dlog.Debug, log << "grp/sendBalancing: socket in BROKEN state: @" << d->id << ", sockstatus=" << SockStatusStr(d->ps ? d->ps->getStatus() : SRTS_NONEXIST)); + wipeme.push_back(d); + d->sndresult = -1; + + /* + This distinction is now blocked - it has led to blocking removal of + authentically broken sockets that just got only incorrect state update. + (XXX This problem has to be fixed either, but when epoll is rewritten it + will be fixed from the start anyway). + + // Check if broken permanently + if (!d->ps || d->ps->getStatus() == SRTS_BROKEN) + { + HLOGC(dlog.Debug, log << "... permanently. Will delete it from group $" << id()); + wipeme.push_back(d); + } + else + { + HLOGC(dlog.Debug, log << "... socket still " << SockStatusStr(d->ps ? d->ps->getStatus() : SRTS_NONEXIST)); + } + */ + continue; + } + + if (d->sndstate == GST_IDLE) + { + SRT_SOCKSTATUS st = SRTS_NONEXIST; + if (d->ps) + st = d->ps->getStatus(); + // If the socket is already broken, move it to broken. + if (int(st) >= int(SRTS_BROKEN)) + { + HLOGC(dlog.Debug, log << "CUDTGroup::send.$" << id() << ": @" << d->id << " became " + << SockStatusStr(st) << ", WILL BE CLOSED."); + wipeme.push_back(d); + d->sndstate = GST_BROKEN; + d->sndresult = -1; + continue; + } + + if (st != SRTS_CONNECTED) + { + HLOGC(dlog.Debug, log << "CUDTGroup::send. @" << d->id << " is still " << SockStatusStr(st) << ", skipping."); + pending.push_back(d); + continue; + } + + HLOGC(dlog.Debug, log << "grp/sendBalancing: socket in IDLE state: @" << d->id << " - ACTIVATING it"); + d->sndstate = GST_RUNNING; + continue; + } + + if (d->sndstate == GST_RUNNING) + { + HLOGC(dlog.Debug, log << "grp/sendBalancing: socket in RUNNING state: @" << d->id << " - will send a payload"); + continue; + } + + HLOGC(dlog.Debug, log << "grp/sendBalancing: socket @" << d->id << " not ready, state: " + << StateStr(d->sndstate) << "(" << int(d->sndstate) << ") - NOT sending, SET AS PENDING"); + + pending.push_back(d); + } + + SRT_ATR_UNUSED CUDTException cx (MJ_SUCCESS, MN_NONE, 0); + BalancingLinkState lstate = { m_Group.active(), 0, 0 }; + int stat = -1; + gli_t selink; // will be initialized first in the below loop + + for (;;) + { + // Repeatable block. + // The algorithm is more-less: + // + // 1. Select a link to use for sending + // 2. Perform the operation + // 3. If the operation succeeded, record this link and exit with success + // 4. If the operation failed, call selector again, this time with error info + // 5. The selector can return a link to use again, or gli_NULL() if the operation should fail + // 6. If the selector returned a valid link, go to p. 2. + + // Call selection. Default: defaultSelectLink + selink = CALLBACK_CALL(m_cbSelectLink, lstate); + + if (selink == m_Group.null()) + { + stat = -1; // likely not possible, but make sure. + break; + } + + // Sanity check + if (selink->sndstate != GST_RUNNING) + { + LOGC(mglog.Error, log << "IPE: sendBalancing: selectLink returned an iactive link! - trying blindly anyway"); + } + + // Perform the operation + int erc = SRT_SUCCESS; + try + { + // This must be wrapped in try-catch because on error it throws an exception. + // Possible return values are only 0, in case when len was passed 0, or a positive + // >0 value that defines the size of the data that it has sent, that is, in case + // of Live mode, equal to 'len'. + CUDTSocket* ps = selink->ps; + InvertedLock ug (m_GroupLock); + + HLOGC(dlog.Debug, log << "grp/sendBalancing: SENDING #" << w_mc.msgno << " through link [" << m_uBalancingRoll << "]"); + + // NOTE: EXCEPTION PASSTHROUGH. + stat = ps->core().sendmsg2(buf, len, (w_mc)); + } + catch (CUDTException& e) + { + cx = e; + stat = -1; + erc = e.getErrorCode(); + } + + selink->sndresult = stat; + + if (stat != -1) + { + if (m_iLastSchedMsgNo == -1) + { + // Initialize this number + HLOGC(dlog.Debug, log << "grp/sendBalancing: INITIALIZING message number: " << w_mc.msgno); + m_iLastSchedMsgNo = w_mc.msgno; + } + + m_Group.active(selink); + + // Sending succeeded. Complete the rest of the activities. + break; + } + + // Handle the error. If a link got the blocking error, set + // this link PENDING state. This will cause that this link be + // activated at the next sending call and retried, but in this + // session it will be skipped. + if (erc == SRT_EASYNCSND) + { + selink->sndstate = GST_PENDING; + } + else + { + selink->sndstate = GST_BROKEN; + if (std::find(wipeme.begin(), wipeme.end(), selink) == wipeme.end()) + wipeme.push_back(selink); // unique add + } + + lstate.ilink = selink; + lstate.status = stat; + lstate.errorcode = erc; + + // Now repeat selection. + // Note that every selection either gets a link that + // succeeds (and this loop is broken) or the link becomes + // broken, and then it should be skipped by the selector. + // Eventually with all links broken the selector will return + // no link to be used, and therefore this operation is interrupted + // and error-reported. + } + + send_CheckPendingSockets(pending, (wipeme)); + + + // Do final checkups. + + // Now complete the status data in the function and return. + // This is the case for both successful and failed return. + + size_t grpsize = m_Group.size(); + + if (w_mc.grpdata_size < grpsize) + { + w_mc.grpdata = NULL; + } + + size_t i = 0; + + // Fill the array first before removal. + + bool ready_again = false; + for (gli_t d = m_Group.begin(); d != m_Group.end(); ++d, ++i) + { + if (w_mc.grpdata) + { + // Enough space to fill + w_mc.grpdata[i].id = d->id; + w_mc.grpdata[i].status = d->laststatus; + + if (d->sndstate == GST_RUNNING) + w_mc.grpdata[i].result = d->sndresult; + else if (d->sndstate == GST_IDLE) + w_mc.grpdata[i].result = 0; + else + w_mc.grpdata[i].result = -1; + + memcpy(&w_mc.grpdata[i].peeraddr, &d->peer, d->peer.size()); + } + + // We perform this loop anyway because we still need to check if any + // socket is writable. Note that the group lock will hold any write ready + // updates that are performed just after a single socket update for the + // group, so if any socket is actually ready at the moment when this + // is performed, and this one will result in none-write-ready, this will + // be fixed just after returning from this function. + + ready_again = ready_again | d->ps->writeReady(); + } + + send_CloseBrokenSockets((wipeme)); + + w_mc.grpdata_size = i; + + if (!ready_again) + { + m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false); + } + + // If m_iLastSchedSeqNo wasn't initialized above, don't touch it. + if (m_iLastSchedMsgNo != -1) + { + m_iLastSchedMsgNo = ++MsgNo(m_iLastSchedMsgNo); + HLOGC(dlog.Debug, log << "grp/sendBalancing: updated msgno: " << m_iLastSchedMsgNo); + } + + if (stat == -1) + throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + + return stat; +} + +CUDTGroup::gli_t CUDTGroup::linkSelect_plain(const CUDTGroup::BalancingLinkState& state) +{ + if (state.ilink == gli_NULL()) + { + // Very first sending operation. Pick up the first link + return m_Group.begin(); + } + + gli_t this_link = state.ilink; + + for (;;) + { + // Roll to the next link + ++this_link; + if (this_link == m_Group.end()) + this_link = m_Group.begin(); // roll around + + // Check the status. If the link is PENDING or BROKEN, + // skip it. If the link is IDLE, turn it to ACTIVE. + // If the rolling reached back to the original link, + // and this one isn't usable either, return gli_NULL(). + + if (this_link->sndstate == GST_IDLE) + this_link->sndstate = GST_RUNNING; + + if (this_link->sndstate == GST_RUNNING) + { + // Found you, buddy. Go on. + return this_link; + } + + if (this_link == state.ilink) + { + // No more links. Sorry. + return gli_NULL(); + } + + // Check maybe next link... + } + + return this_link; +} + +struct LinkCapableData +{ + CUDTGroup::gli_t link; + int flight; +}; + +CUDTGroup::gli_t CUDTGroup::linkSelect_UpdateAndReport(CUDTGroup::gli_t this_link) +{ + // When a link is used for sending, the load factor is + // increased by this link's unit load, which is calculated + // basing on how big share among all flight sizes this link has. + // The larger the flight window, the bigger the unit load. + // This unit load then defines how much "it costs" to send + // a packet over that link. The bigger this value is then, + // the less often will this link be selected among others. + + this_link->load_factor += this_link->unit_load; + + HLOGC(dlog.Debug, log << "linkSelect(any): link #" << distance(m_Group.begin(), this_link) + << " selected, upd load_factor=" << this_link->load_factor + << " from unit-load=" << this_link->unit_load); + return this_link; +} + +CUDTGroup::gli_t CUDTGroup::linkSelect_window(const CUDTGroup::BalancingLinkState& state) +{ + if (m_RandomCredit > 0) + { + HLOGC(dlog.Debug, log << "linkSelect_window: remaining credit: " << m_RandomCredit + << " - staying with equal balancing"); + --m_RandomCredit; + return linkSelect_UpdateAndReport(linkSelect_plain(state)); + } + + gli_t this_link = gli_NULL(); + + vector linkdata; + int total_flight = 0; + int number_links = 0; + + // First, collect data required for selection + vector linkorder; + + gli_t last = state.ilink; + ++last; + // NOTE: ++last could make it == m_Group.end() in which + // case the first loop will get 0 passes and the second + // one will be from begin() to end(). + for (gli_t li = last; li != m_Group.end(); ++li) + linkorder.push_back(li); + for (gli_t li = m_Group.begin(); li != last; ++li) + linkorder.push_back(li); + + // Sanity check + if (linkorder.empty()) + { + LOGC(dlog.Error, log << "linkSelect_window: IPE: no links???"); + return gli_NULL(); + } + + // Fallback + this_link = *linkorder.begin(); + + // This does the following: + // We have links: [ 1 2 3 4 5 ] + // Last used link was 4 + // linkorder: [ (5) (1) (2) (3) (4) ] + for (vector::iterator i = linkorder.begin(); i != linkorder.end(); ++i) + { + gli_t li = *i; + int flight = li->ps->core().m_iSndMinFlightSpan; + + HLOGC(dlog.Debug, log << "linkSelect_window: previous link was #" << distance(m_Group.begin(), state.ilink) + << " Checking link #" << distance(m_Group.begin(), li) + << "@" << li->id << " TO " << SockaddrToString(li->peer) + << " flight=" << flight); + + // Upgrade idle to running + if (li->sndstate == GST_IDLE) + li->sndstate = GST_RUNNING; + + if (li->sndstate != GST_RUNNING) + { + HLOGC(dlog.Debug, log << "linkSelect_window: ... state=" << StateStr(li->sndstate) << " - skipping"); + // Skip pending/broken links + continue; + } + + // Check if this link was used at least once so far. + // If not, select it immediately. + if (li->load_factor == 0) + { + HLOGC(dlog.Debug, log << "linkSelect_window: ... load factor empty: SELECTING."); + this_link = li; + return linkSelect_UpdateAndReport(this_link); + } + + ++number_links; + if (flight == -1) + { + HLOGC(dlog.Debug, log << "linkSelect_window: link #" << distance(m_Group.begin(), this_link) + << " HAS NO FLIGHT COUNTED - selecting, deferring to next 18 * numberlinks=" << number_links << " packets."); + // Not measureable flight. Use this link. + this_link = li; + + // Also defer next measurement point by 16 per link. + // Of course, number_links doesn't contain the exact + // number of active links (the loop is underway), but + // it doesn't matter much. The probability is on the + // side of later links, so it's unlikely that earlier + // links could enforce more often update (worst case + // scenario, the probing will happen again in 16 packets). + m_RandomCredit = 16 * number_links; + + return linkSelect_UpdateAndReport(this_link); + } + flight += 2; // prevent having 0 used for equations + + total_flight += flight; + + LinkCapableData lcd = {li, flight}; + linkdata.push_back(lcd); + } + + if (linkdata.empty()) + { + HLOGC(dlog.Debug, log << "linkSelect_window: no capable links found - requesting transmission interrupt!"); + return gli_NULL(); + } + + this_link = linkdata.begin()->link; + double least_load = linkdata.begin()->link->load_factor; + double biggest_unit_load = 0; + + HLOGC(dlog.Debug, log << "linkSelect_window: total_flight (with fix): " << total_flight + << " - updating link load factors:"); + // Now that linkdata list is ready, update the link span values + // If at least one link has the span value not yet measureable + for (vector::iterator i = linkdata.begin(); + i != linkdata.end(); ++i) + { + // Here update the unit load basing on the percentage + // of the link flight size. + // + // The sum of all flight window sizes from all links is + // the total number. The value of the flight size for + // each link shows how much of a percentage this link + // has as share. + // + // Example: in case when all links go totally equally, + // and there is 5 links, each having 10 packets in flight: + // + // total_flight = 50 + // share_load = link_flight / total_flight = 10/50 = 1/5 + // link_load = share_load * number_links = 1/5 * 5 = 1.0 + // + // If the links are not perfectly equivalent, some deviation + // towards 1.0 will result. + double share_load = double(i->flight) / total_flight; + double link_load = share_load * number_links; + i->link->unit_load = link_load; + + // Another example: 3 links, 2 of them have 10 packets + // in flight, 1 has 20 packets. + + // total_flight = 40 + // share_load = link_flight / total_flight = 10/40 = 1/4 ; 20/30 = 1/2 + // link_load = share_load * number_links + // link_load[10] = 1/4 * 3 = 0.75; + // link_load[20] = 1/2 * 3 = 1.50; + + HLOGC(dlog.Debug, log << "linkSelect_window: ... #" << distance(m_Group.begin(), i->link) + << " flight=" << i->flight << " share_load=" << (100*share_load) << "% unit-load=" + << link_load << " current-load:" << i->link->load_factor); + + if (link_load > biggest_unit_load) + biggest_unit_load = link_load; + + if (i->link->load_factor < least_load) + { + HLOGC(dlog.Debug, log << "linkSelect_window: ... this link has currently smallest load"); + this_link = i->link; + least_load = i->link->load_factor; + } + } + + HLOGC(dlog.Debug, log << "linkSelect_window: selecting link #" << distance(m_Group.begin(), this_link)); + // Now that a link is selected and all load factors updated, + // do a CUTOFF by the value of at least one size of unit load. + + + // This comparison can be used to recognize if all values of + // the load factor have already exceeded the value that should + // result in a cutoff. + if (biggest_unit_load > 0 && least_load > 2 * biggest_unit_load) + { + for (vector::iterator i = linkdata.begin(); + i != linkdata.end(); ++i) + { + i->link->load_factor -= biggest_unit_load; + } + HLOGC(dlog.Debug, log << "linkSelect_window: cutting off value of " << biggest_unit_load + << " from all load factors"); + } + + // The above loop certainly found something. + return linkSelect_UpdateAndReport(this_link); +} + +CUDTGroup::gli_t CUDTGroup::linkSelect_fixed(const CUDTGroup::BalancingLinkState& state) +{ + gli_t this_link = gli_NULL(); + + int total_weight = 0; + int total_links = 0; + + vector equi_links; + + // Get weight information from every link. + // This should be done every time because between + // the calls a link could be added a new or closed. + + for (gli_t li = m_Group.begin(); li != m_Group.end(); ++li) + { + // Upgrade idle to running + if (li->sndstate == GST_IDLE) + li->sndstate = GST_RUNNING; + + if (li->sndstate != GST_RUNNING) + { + HLOGC(dlog.Debug, log << "linkSelect_fixed: ... state=" << StateStr(li->sndstate) << " - skipping"); + // Skip pending/broken links + continue; + } + + if (this_link == gli_NULL()) + this_link = li; + + // Don't count this link if its weight == 0. + if (li->weight != 0) + { + total_weight += li->weight; + ++total_links; + } + else + { + equi_links.push_back(li); + } + } + + if (state.ilink == gli_NULL()) + return this_link; // either first found or gli_NULL(). + + int avg_weight = 1; + if (total_links) // Fix for a case when weight wasn't set on any link + { + int avg = total_weight/total_links; + if (avg) // Fix for a case when some1 set weight 1 on 1 link and 0 on other 2. + avg_weight = avg; + } + + for (vector::iterator i = equi_links.begin(); i != equi_links.end(); ++i) + { + gli_t li = *i; + li->weight = avg_weight; + total_weight += avg_weight; + ++total_links; + } + + // average weight remains the same, there's only at best + // changed the total weight. + + // Ok, now we have all links' weights ensured, + // so now we can calculate unit load for each one + + // First, collect data required for selection + vector linkorder; + + gli_t last = state.ilink; + ++last; + // NOTE: ++last could make it == m_Group.end() in which + // case the first loop will get 0 passes and the second + // one will be from begin() to end(). + for (gli_t li = last; li != m_Group.end(); ++li) + linkorder.push_back(li); + for (gli_t li = m_Group.begin(); li != last; ++li) + linkorder.push_back(li); + + // Looping on `linkorder` is required because: + // We have links: [ 1 2 3 4 5 ] + // Last used link was 4 + // linkorder: [ (5) (1) (2) (3) (4) ] + + // Sanity check + if (linkorder.empty()) + { + LOGC(dlog.Error, log << "linkSelect_fixed: IPE: no links???"); + return gli_NULL(); + } + + // Fallback + this_link = *linkorder.begin(); + + double least_load = this_link->load_factor; + double biggest_unit_load = 0; + + HLOGC(dlog.Debug, log << "linkSelect_fixed: total_weight= " << total_weight + << " avg=" << avg_weight + << " - updating link load factors:"); + // Now that linkdata list is ready, update the link span values + // If at least one link has the span value not yet measureable + for (vector::iterator i = linkorder.begin(); i != linkorder.end(); ++i) + { + gli_t li = *i; + if (li->sndstate != GST_RUNNING) + { + continue; + } + + // Here update the unit load basing on the percentage + // of the link's weight. + // + // The average is a base to calculate the cost of sending + // depending on desired load percentage. Particular link's + // cost of sending is a positive number that rolls around 1. + // We state that a link which's load equals to average load + // should have the cost equal to 1. The cost of sending should + // be reverse-proportional to the desired load percentage + // and it should be 1 deviated by proportional overstatement + // or understatement towards the average. + // + // deviation = (average - link_weight) / average + // unit_load = 1 + deviation; + // + // The rule for unit_load is such that a sum of all unit_loads + // from all active links must be equal to the number of links. + // + // Also, the declared weight is the number of packets sent through + // particular link when there's been sent overall the number of + // packets equal to the sum of all weights. + // + // Example 1 + // + // We have three links with weights: 10, 20, 30 + // + // Average: 60/3 = 20 + // deviation[10] = (20-10)/20 = 1/2 -> unit_load = 1.5 + // deviation[20] = (20-20)/20 = 0 -> unit_load = 1 + // deviation[30] = (20-30)/20 = -1/2 -> unit_load = 0.5 + // + // Example 2 + // + // We have two links with weights: 80, 20 + // + // Average: 100/2 = 50 + // deviation[20] = (50-20)/50 = 3/5 -> unit_load = 1 + 0.6 = 1.6 + // deviation[80] = (50-80)/50 = -3/5 -> unit_load = 1 - 0.6 = 0.4 + + double average = avg_weight; + double deviation = (average - li->weight) / average; + li->unit_load = 1 + deviation; + + HLOGC(dlog.Debug, log << "linkSelect_fixed: ... #" << distance(m_Group.begin(), li) + << " weight=" << li->weight << " deviation=" << (100*deviation) << "% unit-load=" + << li->unit_load << " current-load:" << li->load_factor); + + if (li->unit_load > biggest_unit_load) + biggest_unit_load = li->unit_load; + + if (li->load_factor < least_load) + { + HLOGC(dlog.Debug, log << "linkSelect_fixed: ... this link has currently smallest load"); + this_link = li; + least_load = li->load_factor; + } + } + + HLOGC(dlog.Debug, log << "linkSelect_fixed: selecting link #" << distance(m_Group.begin(), this_link)); + // Now that a link is selected and all load factors updated, + // do a CUTOFF by the value of at least one size of unit load. + + // This comparison can be used to recognize if all values of + // the load factor have already exceeded the value that should + // result in a cutoff. + if (biggest_unit_load > 0 && least_load > 2 * biggest_unit_load) + { + for (gli_t i = m_Group.begin(); i != m_Group.end(); ++i) + { + if (i->sndstate != GST_RUNNING) + { + continue; + } + i->load_factor -= biggest_unit_load; + if (i->load_factor < 0) + i->load_factor = 0; + HLOGC(dlog.Debug, log << "linkSelect_fixed: cutting off value of " << biggest_unit_load + << " from load-factor:" << i->load_factor); + } + } + else + { + HLOGC(dlog.Debug, log << "linkSelect_fixed: not cutting off yet, biggest_unit_load=" + << biggest_unit_load << " (exp >0) && least_load=" << least_load << " (exp > 2 * " + << biggest_unit_load << ")"); + } + + // The above loop certainly found something. + return linkSelect_UpdateAndReport(this_link); +} + + diff --git a/srtcore/core.h b/srtcore/core.h index e41d43665..c70a368b0 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -218,6 +218,9 @@ class CUDTGroup bool ready_write; bool ready_error; + // Balancing data + double load_factor;// Current load on this link (cunulates unit_load values) + double unit_load; // Cost of sending, fixed or calc'd b.on network stats // Configuration int weight; }; @@ -348,6 +351,7 @@ class CUDTGroup int send(const char* buf, int len, SRT_MSGCTRL& w_mc); int sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc); int sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc); + int sendBalancing(const char* buf, int len, SRT_MSGCTRL& w_mc); private: // For Backup, sending all previous packet @@ -373,6 +377,7 @@ class CUDTGroup public: int recv(char* buf, int len, SRT_MSGCTRL& w_mc); + int recvBalancing(char* buf, int len, SRT_MSGCTRL& w_mc); void close(); @@ -623,20 +628,26 @@ class CUDTGroup { std::vector packet; SRT_MSGCTRL mctrl; - ReadPos(int32_t s): mctrl(srt_msgctrl_default) + ReadPos(int32_t s, SRT_GROUP_TYPE gt): mctrl(srt_msgctrl_default) { - mctrl.pktseq = s; + if (gt == SRT_GTYPE_BALANCING) + mctrl.msgno = s; + else + mctrl.pktseq = s; } }; std::map m_Positions; ReadPos* checkPacketAhead(); + ReadPos* checkPacketAheadMsgno(); // This is the sequence number of a packet that has been previously // delivered. Initially it should be set to SRT_SEQNO_NONE so that the sequence read // from the first delivering socket will be taken as a good deal. volatile int32_t m_RcvBaseSeqNo; + // Version used when using msgno synchronization. + volatile int32_t m_RcvBaseMsgNo; bool m_bOpened; // Set to true when at least one link is at least pending bool m_bConnected; // Set to true on first link confirmed connected bool m_bClosing; @@ -653,6 +664,49 @@ class CUDTGroup srt::sync::Mutex m_RcvDataLock; volatile int32_t m_iLastSchedSeqNo; // represetnts the value of CUDT::m_iSndNextSeqNo for each running socket volatile int32_t m_iLastSchedMsgNo; + unsigned int m_uBalancingRoll; + + /// This is initialized with some number that should be + /// decreased with every packet sent. Any decision and + /// analysis for a decision concerning balancing group behavior + /// should be taken only when this value is 0. During some + /// of the analysis steps this value may be reset to some + /// higer value so that for particular number of packets + /// no analysis is being done (this prevents taking measurement + /// data too early when the number of collected data was + /// too little and therefore any average is little reliable). + unsigned int m_RandomCredit; + + struct BalancingLinkState + { + gli_t ilink; // previously used link + int status; // 0 = normal first entry; -1 = repeated selection + int errorcode; + }; + typedef gli_t selectLink_cb(void*, const BalancingLinkState&); + CallbackHolder m_cbSelectLink; + + CUDTGroup::gli_t linkSelect_UpdateAndReport(CUDTGroup::gli_t this_link); + CUDTGroup::gli_t linkSelect_plain(const CUDTGroup::BalancingLinkState& state); + + // Plain algorithm: simply distribute the load + // on all links equally. + gli_t linkSelect_fixed(const BalancingLinkState&); + static gli_t linkSelect_fixed_fw(void* opaq, const BalancingLinkState& st) + { + CUDTGroup* g = (CUDTGroup*)opaq; + return g->linkSelect_fixed(st); + } + + // Window algorihm: keep balance, but mind the sending cost + // for every link basing on the flight window size. Keep links + // balanced according to the cost of sending. + gli_t linkSelect_window(const BalancingLinkState&); + static gli_t linkSelect_window_fw(void* opaq, const BalancingLinkState& st) + { + CUDTGroup* g = (CUDTGroup*)opaq; + return g->linkSelect_window(st); + } public: // Required after the call on newGroup on the listener side. @@ -678,7 +732,12 @@ class CUDTGroup // this is going to be past the ISN, at worst it will be caused // by TLPKTDROP. m_RcvBaseSeqNo = SRT_SEQNO_NONE; + m_RcvBaseMsgNo = SRT_MSGNO_NONE; } + int baseOffset(SRT_MSGCTRL& mctrl); + int baseOffset(ReadPos& pos); + bool seqDiscrepancy(SRT_MSGCTRL& mctrl); + bool msgDiscrepancy(SRT_MSGCTRL& mctrl); bool applyGroupTime(time_point& w_start_time, time_point& w_peer_start_time) { @@ -1401,6 +1460,9 @@ class CUDT m_iRcvCurrSeqNo = CSeqNo::decseq(isn); } + + volatile int m_iSndMinFlightSpan; // updated with every ACK, number of packets in flight at ACK + int32_t m_iISN; // Initial Sequence Number bool m_bPeerTsbPd; // Peer accept TimeStamp-Based Rx mode bool m_bPeerTLPktDrop; // Enable sender late packet dropping