From a3abea769f92ff59c588b5fd62293d08eb883dcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 25 Mar 2021 12:54:11 +0100 Subject: [PATCH 1/3] Added UT to show bug in group update --- test/filelist.maf | 2 +- test/test_bonding.cpp | 113 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 110 insertions(+), 5 deletions(-) diff --git a/test/filelist.maf b/test/filelist.maf index 77f0d4b1f..2bcef22ed 100644 --- a/test/filelist.maf +++ b/test/filelist.maf @@ -1,7 +1,6 @@ SOURCES test_buffer.cpp -test_bonding.cpp test_common.cpp test_connection_timeout.cpp test_many_connections.cpp @@ -24,4 +23,5 @@ test_utilities.cpp # Tests for bonding only - put here! SOURCES - ENABLE_EXPERIMENTAL_BONDING +test_bonding.cpp diff --git a/test/test_bonding.cpp b/test/test_bonding.cpp index 7d02ee5bb..bf249e4cf 100644 --- a/test/test_bonding.cpp +++ b/test/test_bonding.cpp @@ -1,4 +1,3 @@ - #include #include #include @@ -12,11 +11,11 @@ #include #endif -#if ENABLE_EXPERIMENTAL_BONDING - #include "gtest/gtest.h" #include "srt.h" +#include "logging_api.h" +#include "udt.h" #include "netinet_any.h" TEST(Bonding, SRTConnectGroup) @@ -336,4 +335,110 @@ TEST(Bonding, CloseGroupAndSocket) srt_cleanup(); } -#endif // ENABLE_EXPERIMENTAL_BONDING +void SetLongSilenceTolerant(const SRTSOCKET s) +{ + int longtime = 100000; + + srt_setsockflag(s, SRTO_CONNTIMEO, &longtime, sizeof longtime); + srt_setsockflag(s, SRTO_PEERIDLETIMEO, &longtime, sizeof longtime); +} + +TEST(Bonding, DeadLinkUpdate) +{ + using namespace std; + using namespace std::chrono; + + srt_startup(); + + { + using namespace srt; + using namespace srt_logging; + + setloglevel(LogLevel::debug); + set fas = { SRT_LOGFA_GRP_RECV }; + resetlogfa(fas); + } + + + SRTSOCKET listener = srt_create_socket(); + const SRTSOCKET group = srt_create_group(SRT_GTYPE_BACKUP); + + SetLongSilenceTolerant(listener); + SetLongSilenceTolerant(group); + + sockaddr_any sa(AF_INET); + + inet_pton(AF_INET, "127.0.0.1", sa.get_addr()); + + sa.hport(5555); + + int allow_groups = 1; + srt_bind(listener, sa.get(), sa.size()); + srt_listen(listener, 1); + srt_setsockflag(listener, SRTO_GROUPCONNECT, &allow_groups, sizeof allow_groups); + char srcbuf [] = "1234ABCD"; + + thread td = thread([&]() { + cout << "[T] Connecting 1...\n"; + const SRTSOCKET member1 = srt_connect(group, sa.get(), sa.size()); + // Now wait 3s + cout << "[T] Link 1 established. Wait 3s...\n"; + this_thread::sleep_for(seconds(3)); + + cout << "[T] Connecting 2...\n"; + // Make a second connection + srt_connect(group, sa.get(), sa.size()); + + cout << "[T] Link 2 established. Wait 3s...\n"; + // Again wait 3s + this_thread::sleep_for(seconds(3)); + + cout << "[T] Killing link 1...\n"; + // Now close the first connection + srt_close(member1); + + // Now send the data and see if they are received + cout << "[T] Sending: size=" << (sizeof srcbuf) << " Content: '" << srcbuf << "'...\n"; + int nsent = srt_send(group, srcbuf, sizeof srcbuf); + ASSERT_NE(nsent, -1); + + cout << "[T] Wait 3s...\n"; + // Again wait 3s + this_thread::sleep_for(seconds(3)); + + cout << "[T] Killing the group and exitting.\n"; + // And close + srt_close(group); + }); + + cout << "Accepting...\n"; + const SRTSOCKET acp = srt_accept(listener, NULL, NULL); + + ASSERT_EQ(acp & SRTGROUP_MASK, SRTGROUP_MASK); + + // Close and set up the listener again. + srt_close(listener); + listener = srt_create_socket(); + srt_bind(listener, sa.get(), sa.size()); + srt_listen(listener, 1); + srt_setsockflag(listener, SRTO_GROUPCONNECT, &allow_groups, sizeof allow_groups); + + cout << "Group accepted. Receiving...\n"; + char buf[1316] = ""; + const int nrecv = srt_recv(acp, buf, 1316); + + cout << "Received: val=" << nrecv << " Content: '" << buf << "'\n"; + + ASSERT_NE(nrecv, -1); + + EXPECT_EQ(strcmp(srcbuf, buf), 0); + + cout << "Closing.\n"; + srt_close(acp); + srt_close(listener); + + td.join(); + + srt_cleanup(); +} + From 0b8214610e61060a333dd23f728c841b82d847b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 25 Mar 2021 18:11:45 +0100 Subject: [PATCH 2/3] Fixed: added UPDATE event also for the group that was added a new socket --- srtcore/api.cpp | 48 ++++++++++++++++++++----------------------- srtcore/api.h | 3 +++ srtcore/group.cpp | 26 ++++++++++++++++++++++- srtcore/group.h | 2 +- test/test_bonding.cpp | 5 +++-- 5 files changed, 54 insertions(+), 30 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 4293d8021..6acba3a63 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -729,29 +729,6 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer, g->m_bConnected = true; } - // XXX PROLBEM!!! These events are subscribed here so that this is done once, lazily, - // but groupwise connections could be accepted from multiple listeners for the same group! - // m_listener MUST BE A CONTAINER, NOT POINTER!!! - // ALSO: Maybe checking "the same listener" is not necessary as subscruption may be done - // multiple times anyway? - if (!g->m_listener) - { - // Newly created group from the listener, which hasn't yet - // the listener set. - g->m_listener = ls; - - // Listen on both first connected socket and continued sockets. - // This might help with jump-over situations, and in regular continued - // sockets the IN event won't be reported anyway. - int listener_modes = SRT_EPOLL_ACCEPT | SRT_EPOLL_UPDATE; - epoll_add_usock_INTERNAL(g->m_RcvEID, ls, &listener_modes); - - // This listening should be done always when a first connected socket - // appears as accepted off the listener. This is for the sake of swait() calls - // inside the group receiving and sending functions so that they get - // interrupted when a new socket is connected. - } - // Add also per-direction subscription for the about-to-be-accepted socket. // Both first accepted socket that makes the group-accept and every next // socket that adds a new link. @@ -812,6 +789,15 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer, // acknowledge INTERNAL users waiting for new connections on the listening socket // that are reported when a new socket is connected within an already connected group. m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, SRT_EPOLL_UPDATE, true); +#if ENABLE_EXPERIMENTAL_BONDING + // Note that the code in this current IF branch can only be executed in case + // of group members. Otherwise should_submit_to_accept will be always true. + if (ns->m_GroupOf) + { + HLOGC(gmlog.Debug, log << "GROUP UPDATE $" << ns->m_GroupOf->id() << " per connected socket @" << ns->m_SocketID); + m_EPoll.update_events(ns->m_GroupOf->id(), ns->m_GroupOf->m_sPollID, SRT_EPOLL_UPDATE, true); + } +#endif CGlobEvent::triggerEvent(); } @@ -2360,8 +2346,7 @@ int CUDTUnited::epoll_add_usock( if (u & SRTGROUP_MASK) { GroupKeeper k (*this, u, ERH_THROW); - ret = m_EPoll.update_usock(eid, u, events); - k.group->addEPoll(eid); + ret = epoll_add_group_INTERNAL(eid, *k.group, events); return 0; } #endif @@ -2379,6 +2364,15 @@ int CUDTUnited::epoll_add_usock( return ret; } +#if ENABLE_EXPERIMENTAL_BONDING +int CUDTUnited::epoll_add_group_INTERNAL(const int eid, CUDTGroup& rg, const int* events) +{ + int ret = m_EPoll.update_usock(eid, rg.id(), events); + rg.addEPoll(eid); + return ret; +} +#endif + // NOTE: WILL LOCK (serially): // - CEPoll::m_EPollLock // - CUDT::m_RecvLock @@ -3159,7 +3153,9 @@ CUDTGroup& CUDT::newGroup(const int type) const SRTSOCKET id = s_UDTUnited.generateSocketID(true); // Now map the group - return s_UDTUnited.addGroup(id, SRT_GROUP_TYPE(type)).set_id(id); + CUDTGroup& rg = s_UDTUnited.addGroup(id, SRT_GROUP_TYPE(type)).set_id(id); + rg.subscribeUpdate(); + return rg; } SRTSOCKET CUDT::createGroup(SRT_GROUP_TYPE gt) diff --git a/srtcore/api.h b/srtcore/api.h index 35f215ca3..989fa93fe 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -270,6 +270,9 @@ friend class CRendezvousQueue; int epoll_clear_usocks(int eid); int epoll_add_usock(const int eid, const SRTSOCKET u, const int* events = NULL); int epoll_add_usock_INTERNAL(const int eid, CUDTSocket* s, const int* events); +#if ENABLE_EXPERIMENTAL_BONDING + int epoll_add_group_INTERNAL(const int eid, CUDTGroup& rg, const int* events); +#endif int epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events = NULL); int epoll_remove_usock(const int eid, const SRTSOCKET u); template diff --git a/srtcore/group.cpp b/srtcore/group.cpp index b13caad08..229280afa 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -286,7 +286,6 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_selfManaged(true) , m_bSyncOnMsgNo(false) , m_type(gtype) - , m_listener() , m_iBusy() , m_iSndOldestMsgNo(SRT_MSGNO_NONE) , m_iSndAckedMsgNo(SRT_MSGNO_NONE) @@ -4662,6 +4661,31 @@ void CUDTGroup::updateFailedLink() } } +void CUDTGroup::subscribeUpdate() +{ + // In the beginning, subscribe the EIDs for yourself's UPDATE event. + // This event will be raised every time a new socket is added to the + // group, except the first one that makes the group open. + + int update_mode = SRT_EPOLL_UPDATE; + + // Subscribe both sending and receiving EID. + // Both sending and receiving can be blocked indefinitely + // (although for receiving it's more dangerous) as well as + // we cannot define from upside that a group created by a + // caller cannot then receive member connections from some + // listener (there's no problem with reusing a group extracted + // by `srt_groupof()` and make a connection to this group + // that was previously created by having been created by the + // accepted socket). + CUDT::s_UDTUnited.epoll_add_group_INTERNAL(m_RcvEID, *this, &update_mode); + CUDT::s_UDTUnited.epoll_add_group_INTERNAL(m_SndEID, *this, &update_mode); + // NOTE: this function theoretically may throw an exception, + // but that's only in case of invalid parameters. It doesn't + // need catching exceptions, as long as paramteres are surely + // verified before the call. +} + #if ENABLE_HEAVY_LOGGING // [[using maybe_locked(CUDT::s_UDTUnited.m_GlobControlLock)]] void CUDTGroup::debugGroup() diff --git a/srtcore/group.h b/srtcore/group.h index 4f014a849..d4466fe87 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -448,7 +448,6 @@ class CUDTGroup bool m_selfManaged; bool m_bSyncOnMsgNo; SRT_GROUP_TYPE m_type; - CUDTSocket* m_listener; // A "group" can only have one listener. int m_iBusy; CallbackHolder m_cbConnectHook; void installConnectHook(srt_connect_callback_fn* hook, void* opaq) @@ -814,6 +813,7 @@ class CUDTGroup void synchronizeDrift(CUDT* cu, duration udrift, time_point newtimebase); void updateLatestRcv(CUDTSocket*); + void subscribeUpdate(); // Property accessors SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, id, m_GroupID); diff --git a/test/test_bonding.cpp b/test/test_bonding.cpp index bf249e4cf..c470f3cf8 100644 --- a/test/test_bonding.cpp +++ b/test/test_bonding.cpp @@ -350,14 +350,15 @@ TEST(Bonding, DeadLinkUpdate) srt_startup(); + /* logging enabled, for development { using namespace srt; using namespace srt_logging; setloglevel(LogLevel::debug); - set fas = { SRT_LOGFA_GRP_RECV }; + set fas = { SRT_LOGFA_GRP_RECV, SRT_LOGFA_GRP_MGMT }; resetlogfa(fas); - } + }*/ SRTSOCKET listener = srt_create_socket(); From 58b884dc99fc36a60189aa25fbad24c640082f23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 25 Mar 2021 18:34:44 +0100 Subject: [PATCH 3/3] Added more error information for a failing test --- test/test_bonding.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/test_bonding.cpp b/test/test_bonding.cpp index c470f3cf8..1ac5517a3 100644 --- a/test/test_bonding.cpp +++ b/test/test_bonding.cpp @@ -17,6 +17,7 @@ #include "logging_api.h" #include "udt.h" #include "netinet_any.h" +#include TEST(Bonding, SRTConnectGroup) { @@ -427,8 +428,15 @@ TEST(Bonding, DeadLinkUpdate) cout << "Group accepted. Receiving...\n"; char buf[1316] = ""; const int nrecv = srt_recv(acp, buf, 1316); + int syserr, err; + err = srt_getlasterror(&syserr); cout << "Received: val=" << nrecv << " Content: '" << buf << "'\n"; + if (nrecv == -1) + { + cout << "ERROR: " << srt_strerror(err, syserr) << endl; + cout << "STATUS: " << srt_logging::SockStatusStr(srt_getsockstate(acp)) << endl; + } ASSERT_NE(nrecv, -1);