diff --git a/srtcore/api.cpp b/srtcore/api.cpp index bb5dd64fe..8714a6243 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -714,29 +714,6 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, g->m_bConnected = true; } - // XXX PROLBEM!!! These events are subscribed here so that this is done once, lazily, - // but groupwise connections could be accepted from multiple listeners for the same group! - // m_listener MUST BE A CONTAINER, NOT POINTER!!! - // ALSO: Maybe checking "the same listener" is not necessary as subscruption may be done - // multiple times anyway? - if (!g->m_listener) - { - // Newly created group from the listener, which hasn't yet - // the listener set. - g->m_listener = ls; - - // Listen on both first connected socket and continued sockets. - // This might help with jump-over situations, and in regular continued - // sockets the IN event won't be reported anyway. - int listener_modes = SRT_EPOLL_ACCEPT | SRT_EPOLL_UPDATE; - epoll_add_usock_INTERNAL(g->m_RcvEID, ls, &listener_modes); - - // This listening should be done always when a first connected socket - // appears as accepted off the listener. This is for the sake of swait() calls - // inside the group receiving and sending functions so that they get - // interrupted when a new socket is connected. - } - // Add also per-direction subscription for the about-to-be-accepted socket. // Both first accepted socket that makes the group-accept and every next // socket that adds a new link. @@ -797,6 +774,15 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, // acknowledge INTERNAL users waiting for new connections on the listening socket // that are reported when a new socket is connected within an already connected group. m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_UPDATE, true); +#if ENABLE_BONDING + // Note that the code in this current IF branch can only be executed in case + // of group members. Otherwise should_submit_to_accept will be always true. + if (ns->m_GroupOf) + { + HLOGC(gmlog.Debug, log << "GROUP UPDATE $" << ns->m_GroupOf->id() << " per connected socket @" << ns->m_SocketID); + m_EPoll.update_events(ns->m_GroupOf->id(), ns->m_GroupOf->m_sPollID, SRT_EPOLL_UPDATE, true); + } +#endif CGlobEvent::triggerEvent(); } @@ -2385,8 +2371,7 @@ int srt::CUDTUnited::epoll_add_usock(const int eid, const SRTSOCKET u, const int if (u & SRTGROUP_MASK) { GroupKeeper k(*this, u, ERH_THROW); - ret = m_EPoll.update_usock(eid, u, events); - k.group->addEPoll(eid); + ret = epoll_add_group_INTERNAL(eid, *k.group, events); return 0; } #endif @@ -2404,6 +2389,15 @@ int srt::CUDTUnited::epoll_add_usock(const int eid, const SRTSOCKET u, const int return ret; } +#if ENABLE_BONDING +int srt::CUDTUnited::epoll_add_group_INTERNAL(const int eid, CUDTGroup& rg, const int* events) +{ + int ret = m_EPoll.update_usock(eid, rg.id(), events); + rg.addEPoll(eid); + return ret; +} +#endif + // NOTE: WILL LOCK (serially): // - CEPoll::m_EPollLock // - CUDT::m_RecvLock @@ -3513,7 +3507,9 @@ srt::CUDTGroup& srt::CUDT::newGroup(const int type) const SRTSOCKET id = uglobal().generateSocketID(true); // Now map the group - return uglobal().addGroup(id, SRT_GROUP_TYPE(type)).set_id(id); + CUDTGroup& rg = uglobal().addGroup(id, SRT_GROUP_TYPE(type)).set_id(id); + rg.subscribeUpdate(); + return rg; } SRTSOCKET srt::CUDT::createGroup(SRT_GROUP_TYPE gt) diff --git a/srtcore/api.h b/srtcore/api.h index b5d6be915..ff085f7e6 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -322,6 +322,9 @@ class CUDTUnited int epoll_clear_usocks(int eid); int epoll_add_usock(const int eid, const SRTSOCKET u, const int* events = NULL); int epoll_add_usock_INTERNAL(const int eid, CUDTSocket* s, const int* events); +#if ENABLE_BONDING + int epoll_add_group_INTERNAL(const int eid, CUDTGroup& rg, const int* events); +#endif int epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events = NULL); int epoll_remove_usock(const int eid, const SRTSOCKET u); template diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 5601cdeee..d84469619 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -252,7 +252,6 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_GroupID(-1) , m_PeerGroupID(-1) , m_type(gtype) - , m_listener() , m_iBusy() , m_iSndOldestMsgNo(SRT_MSGNO_NONE) , m_iSndAckedMsgNo(SRT_MSGNO_NONE) @@ -4260,6 +4259,31 @@ void CUDTGroup::updateFailedLink() } } +void CUDTGroup::subscribeUpdate() +{ + // In the beginning, subscribe the EIDs for yourself's UPDATE event. + // This event will be raised every time a new socket is added to the + // group, except the first one that makes the group open. + + int update_mode = SRT_EPOLL_UPDATE; + + // Subscribe both sending and receiving EID. + // Both sending and receiving can be blocked indefinitely + // (although for receiving it's more dangerous) as well as + // we cannot define from upside that a group created by a + // caller cannot then receive member connections from some + // listener (there's no problem with reusing a group extracted + // by `srt_groupof()` and make a connection to this group + // that was previously created by having been created by the + // accepted socket). + CUDT::uglobal().epoll_add_group_INTERNAL(m_RcvEID, *this, &update_mode); + CUDT::uglobal().epoll_add_group_INTERNAL(m_SndEID, *this, &update_mode); + // NOTE: this function theoretically may throw an exception, + // but that's only in case of invalid parameters. It doesn't + // need catching exceptions, as long as paramteres are surely + // verified before the call. +} + #if ENABLE_HEAVY_LOGGING // [[using maybe_locked(CUDT::uglobal()->m_GlobControlLock)]] void CUDTGroup::debugGroup() diff --git a/srtcore/group.h b/srtcore/group.h index 56f1456f8..ba8edf86e 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -442,7 +442,6 @@ class CUDTGroup }; GroupContainer m_Group; SRT_GROUP_TYPE m_type; - CUDTSocket* m_listener; // A "group" can only have one listener. srt::sync::atomic m_iBusy; CallbackHolder m_cbConnectHook; void installConnectHook(srt_connect_callback_fn* hook, void* opaq) @@ -791,6 +790,7 @@ class CUDTGroup bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn); /// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference. + void subscribeUpdate(); /// @param srcMember a reference for synchronization. void synchronizeDrift(const srt::CUDT* srcMember); diff --git a/test/test_bonding.cpp b/test/test_bonding.cpp index 3aba0b695..547a97c9c 100644 --- a/test/test_bonding.cpp +++ b/test/test_bonding.cpp @@ -1,14 +1,17 @@ + #include #include #include #include #include - #include "gtest/gtest.h" #include "test_env.h" #include "srt.h" +#include "logging_api.h" +#include "udt.h" #include "netinet_any.h" +#include TEST(Bonding, SRTConnectGroup) { @@ -561,3 +564,105 @@ TEST(Bonding, InitialFailure) srt_close(lsn); } +void SetLongSilenceTolerant(const SRTSOCKET s) +{ + int longtime = 100000; + + srt_setsockflag(s, SRTO_CONNTIMEO, &longtime, sizeof longtime); + srt_setsockflag(s, SRTO_PEERIDLETIMEO, &longtime, sizeof longtime); +} + +TEST(Bonding, DeadLinkUpdate) +{ + using namespace std; + using namespace std::chrono; + + srt::TestInit srtinit; + + SRTSOCKET listener = srt_create_socket(); + const SRTSOCKET group = srt_create_group(SRT_GTYPE_BACKUP); + + SetLongSilenceTolerant(listener); + SetLongSilenceTolerant(group); + + srt::sockaddr_any sa(AF_INET); + + inet_pton(AF_INET, "127.0.0.1", sa.get_addr()); + + sa.hport(5555); + + int allow_groups = 1; + srt_bind(listener, sa.get(), sa.size()); + srt_listen(listener, 1); + srt_setsockflag(listener, SRTO_GROUPCONNECT, &allow_groups, sizeof allow_groups); + char srcbuf [] = "1234ABCD"; + + thread td = thread([&]() { + cout << "[T] Connecting 1...\n"; + const SRTSOCKET member1 = srt_connect(group, sa.get(), sa.size()); + // Now wait 3s + cout << "[T] Link 1 established. Wait 3s...\n"; + this_thread::sleep_for(seconds(3)); + + cout << "[T] Connecting 2...\n"; + // Make a second connection + srt_connect(group, sa.get(), sa.size()); + + cout << "[T] Link 2 established. Wait 3s...\n"; + // Again wait 3s + this_thread::sleep_for(seconds(3)); + + cout << "[T] Killing link 1...\n"; + // Now close the first connection + srt_close(member1); + + // Now send the data and see if they are received + cout << "[T] Sending: size=" << (sizeof srcbuf) << " Content: '" << srcbuf << "'...\n"; + int nsent = srt_send(group, srcbuf, sizeof srcbuf); + ASSERT_NE(nsent, -1); + + cout << "[T] Wait 3s...\n"; + // Again wait 3s + this_thread::sleep_for(seconds(3)); + + cout << "[T] Killing the group and exitting.\n"; + // And close + srt_close(group); + }); + + cout << "Accepting...\n"; + const SRTSOCKET acp = srt_accept(listener, NULL, NULL); + + ASSERT_EQ(acp & SRTGROUP_MASK, SRTGROUP_MASK); + + // Close and set up the listener again. + srt_close(listener); + listener = srt_create_socket(); + srt_bind(listener, sa.get(), sa.size()); + srt_listen(listener, 1); + srt_setsockflag(listener, SRTO_GROUPCONNECT, &allow_groups, sizeof allow_groups); + + cout << "Group accepted. Receiving...\n"; + char buf[1316] = ""; + const int nrecv = srt_recv(acp, buf, 1316); + int syserr, err; + err = srt_getlasterror(&syserr); + + cout << "Received: val=" << nrecv << " Content: '" << buf << "'\n"; + if (nrecv == -1) + { + cout << "ERROR: " << srt_strerror(err, syserr) << endl; + cout << "STATUS: " << srt_logging::SockStatusStr(srt_getsockstate(acp)) << endl; + } + + cout << "Closing.\n"; + srt_close(acp); + srt_close(listener); + + ASSERT_NE(nrecv, -1); + + EXPECT_EQ(strcmp(srcbuf, buf), 0); + + td.join(); +} +