Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] To fix missing new link update when no transmission on existing links #1891

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 22 additions & 26 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,29 +714,6 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
g->m_bConnected = true;
}

// XXX PROLBEM!!! These events are subscribed here so that this is done once, lazily,
// but groupwise connections could be accepted from multiple listeners for the same group!
// m_listener MUST BE A CONTAINER, NOT POINTER!!!
// ALSO: Maybe checking "the same listener" is not necessary as subscruption may be done
// multiple times anyway?
if (!g->m_listener)
{
// Newly created group from the listener, which hasn't yet
// the listener set.
g->m_listener = ls;

// Listen on both first connected socket and continued sockets.
// This might help with jump-over situations, and in regular continued
// sockets the IN event won't be reported anyway.
int listener_modes = SRT_EPOLL_ACCEPT | SRT_EPOLL_UPDATE;
epoll_add_usock_INTERNAL(g->m_RcvEID, ls, &listener_modes);

// This listening should be done always when a first connected socket
// appears as accepted off the listener. This is for the sake of swait() calls
// inside the group receiving and sending functions so that they get
// interrupted when a new socket is connected.
}

// Add also per-direction subscription for the about-to-be-accepted socket.
// Both first accepted socket that makes the group-accept and every next
// socket that adds a new link.
Expand Down Expand Up @@ -797,6 +774,15 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
// acknowledge INTERNAL users waiting for new connections on the listening socket
// that are reported when a new socket is connected within an already connected group.
m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_UPDATE, true);
#if ENABLE_BONDING
// Note that the code in this current IF branch can only be executed in case
// of group members. Otherwise should_submit_to_accept will be always true.
if (ns->m_GroupOf)
{
HLOGC(gmlog.Debug, log << "GROUP UPDATE $" << ns->m_GroupOf->id() << " per connected socket @" << ns->m_SocketID);
m_EPoll.update_events(ns->m_GroupOf->id(), ns->m_GroupOf->m_sPollID, SRT_EPOLL_UPDATE, true);
}
#endif
CGlobEvent::triggerEvent();
}

Expand Down Expand Up @@ -2385,8 +2371,7 @@ int srt::CUDTUnited::epoll_add_usock(const int eid, const SRTSOCKET u, const int
if (u & SRTGROUP_MASK)
{
GroupKeeper k(*this, u, ERH_THROW);
ret = m_EPoll.update_usock(eid, u, events);
k.group->addEPoll(eid);
ret = epoll_add_group_INTERNAL(eid, *k.group, events);
return 0;
}
#endif
Expand All @@ -2404,6 +2389,15 @@ int srt::CUDTUnited::epoll_add_usock(const int eid, const SRTSOCKET u, const int
return ret;
}

#if ENABLE_BONDING
int srt::CUDTUnited::epoll_add_group_INTERNAL(const int eid, CUDTGroup& rg, const int* events)
{
int ret = m_EPoll.update_usock(eid, rg.id(), events);
rg.addEPoll(eid);
return ret;
}
#endif

// NOTE: WILL LOCK (serially):
// - CEPoll::m_EPollLock
// - CUDT::m_RecvLock
Expand Down Expand Up @@ -3513,7 +3507,9 @@ srt::CUDTGroup& srt::CUDT::newGroup(const int type)
const SRTSOCKET id = uglobal().generateSocketID(true);

// Now map the group
return uglobal().addGroup(id, SRT_GROUP_TYPE(type)).set_id(id);
CUDTGroup& rg = uglobal().addGroup(id, SRT_GROUP_TYPE(type)).set_id(id);
rg.subscribeUpdate();
return rg;
}

SRTSOCKET srt::CUDT::createGroup(SRT_GROUP_TYPE gt)
Expand Down
3 changes: 3 additions & 0 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ class CUDTUnited
int epoll_clear_usocks(int eid);
int epoll_add_usock(const int eid, const SRTSOCKET u, const int* events = NULL);
int epoll_add_usock_INTERNAL(const int eid, CUDTSocket* s, const int* events);
#if ENABLE_BONDING
int epoll_add_group_INTERNAL(const int eid, CUDTGroup& rg, const int* events);
#endif
int epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events = NULL);
int epoll_remove_usock(const int eid, const SRTSOCKET u);
template <class EntityType>
Expand Down
26 changes: 25 additions & 1 deletion srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
, m_GroupID(-1)
, m_PeerGroupID(-1)
, m_type(gtype)
, m_listener()
, m_iBusy()
, m_iSndOldestMsgNo(SRT_MSGNO_NONE)
, m_iSndAckedMsgNo(SRT_MSGNO_NONE)
Expand Down Expand Up @@ -4260,6 +4259,31 @@ void CUDTGroup::updateFailedLink()
}
}

void CUDTGroup::subscribeUpdate()
{
// In the beginning, subscribe the EIDs for yourself's UPDATE event.
// This event will be raised every time a new socket is added to the
// group, except the first one that makes the group open.

int update_mode = SRT_EPOLL_UPDATE;

// Subscribe both sending and receiving EID.
// Both sending and receiving can be blocked indefinitely
// (although for receiving it's more dangerous) as well as
// we cannot define from upside that a group created by a
// caller cannot then receive member connections from some
// listener (there's no problem with reusing a group extracted
// by `srt_groupof()` and make a connection to this group
// that was previously created by having been created by the
// accepted socket).
CUDT::uglobal().epoll_add_group_INTERNAL(m_RcvEID, *this, &update_mode);
CUDT::uglobal().epoll_add_group_INTERNAL(m_SndEID, *this, &update_mode);
// NOTE: this function theoretically may throw an exception,
// but that's only in case of invalid parameters. It doesn't
// need catching exceptions, as long as paramteres are surely
// verified before the call.
}

#if ENABLE_HEAVY_LOGGING
// [[using maybe_locked(CUDT::uglobal()->m_GlobControlLock)]]
void CUDTGroup::debugGroup()
Expand Down
2 changes: 1 addition & 1 deletion srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,6 @@ class CUDTGroup
};
GroupContainer m_Group;
SRT_GROUP_TYPE m_type;
CUDTSocket* m_listener; // A "group" can only have one listener.
srt::sync::atomic<int> m_iBusy;
CallbackHolder<srt_connect_callback_fn> m_cbConnectHook;
void installConnectHook(srt_connect_callback_fn* hook, void* opaq)
Expand Down Expand Up @@ -791,6 +790,7 @@ class CUDTGroup
bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn);

/// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference.
void subscribeUpdate();
/// @param srcMember a reference for synchronization.
void synchronizeDrift(const srt::CUDT* srcMember);

Expand Down
107 changes: 106 additions & 1 deletion test/test_bonding.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@

#include <future>
#include <thread>
#include <chrono>
#include <vector>
#include <functional>

#include "gtest/gtest.h"
#include "test_env.h"

#include "srt.h"
#include "logging_api.h"
#include "udt.h"
#include "netinet_any.h"
#include <common.h>

TEST(Bonding, SRTConnectGroup)
{
Expand Down Expand Up @@ -561,3 +564,105 @@ TEST(Bonding, InitialFailure)
srt_close(lsn);
}

void SetLongSilenceTolerant(const SRTSOCKET s)
{
int longtime = 100000;

srt_setsockflag(s, SRTO_CONNTIMEO, &longtime, sizeof longtime);
srt_setsockflag(s, SRTO_PEERIDLETIMEO, &longtime, sizeof longtime);
}

TEST(Bonding, DeadLinkUpdate)
{
using namespace std;
using namespace std::chrono;

srt::TestInit srtinit;

SRTSOCKET listener = srt_create_socket();
const SRTSOCKET group = srt_create_group(SRT_GTYPE_BACKUP);

SetLongSilenceTolerant(listener);
SetLongSilenceTolerant(group);

srt::sockaddr_any sa(AF_INET);

inet_pton(AF_INET, "127.0.0.1", sa.get_addr());

sa.hport(5555);

int allow_groups = 1;
srt_bind(listener, sa.get(), sa.size());
srt_listen(listener, 1);
srt_setsockflag(listener, SRTO_GROUPCONNECT, &allow_groups, sizeof allow_groups);
char srcbuf [] = "1234ABCD";

thread td = thread([&]() {
cout << "[T] Connecting 1...\n";
const SRTSOCKET member1 = srt_connect(group, sa.get(), sa.size());
// Now wait 3s
cout << "[T] Link 1 established. Wait 3s...\n";
this_thread::sleep_for(seconds(3));

cout << "[T] Connecting 2...\n";
// Make a second connection
srt_connect(group, sa.get(), sa.size());

cout << "[T] Link 2 established. Wait 3s...\n";
// Again wait 3s
this_thread::sleep_for(seconds(3));

cout << "[T] Killing link 1...\n";
// Now close the first connection
srt_close(member1);

// Now send the data and see if they are received
cout << "[T] Sending: size=" << (sizeof srcbuf) << " Content: '" << srcbuf << "'...\n";
int nsent = srt_send(group, srcbuf, sizeof srcbuf);
ASSERT_NE(nsent, -1);

cout << "[T] Wait 3s...\n";
// Again wait 3s
this_thread::sleep_for(seconds(3));

cout << "[T] Killing the group and exitting.\n";
// And close
srt_close(group);
});

cout << "Accepting...\n";
const SRTSOCKET acp = srt_accept(listener, NULL, NULL);

ASSERT_EQ(acp & SRTGROUP_MASK, SRTGROUP_MASK);

// Close and set up the listener again.
srt_close(listener);
listener = srt_create_socket();
srt_bind(listener, sa.get(), sa.size());
srt_listen(listener, 1);
srt_setsockflag(listener, SRTO_GROUPCONNECT, &allow_groups, sizeof allow_groups);

cout << "Group accepted. Receiving...\n";
char buf[1316] = "";
const int nrecv = srt_recv(acp, buf, 1316);
int syserr, err;
err = srt_getlasterror(&syserr);

cout << "Received: val=" << nrecv << " Content: '" << buf << "'\n";
if (nrecv == -1)
{
cout << "ERROR: " << srt_strerror(err, syserr) << endl;
cout << "STATUS: " << srt_logging::SockStatusStr(srt_getsockstate(acp)) << endl;
}

cout << "Closing.\n";
srt_close(acp);
srt_close(listener);

ASSERT_NE(nrecv, -1);

EXPECT_EQ(strcmp(srcbuf, buf), 0);

td.join();
}

Loading