Skip to content

Commit

Permalink
Updated. Added new test from Haivision#1891. Added doc info
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikolaj Malecki committed Sep 10, 2024
1 parent 97c96fa commit 90c2888
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/API/API-socket-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,11 @@ allowed must take this into consideration. It's up to the caller of this
function to make this distinction and to take appropriate action depending on
the type of entity returned.

Note: this flag should be altered **before** calling `srt_listen`. If you do
this after this call, you might have some pending group connections in the
meantime that will be rejected because group connections are not **yet**
allowed on this listener socket.

When this flag is set to 1 on an accepted socket that is passed to the
listener callback handler, it means that this socket is created for a group
connection and it will become a member of a group. Note that in this case
Expand Down
131 changes: 128 additions & 3 deletions test/test_bonding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

#include "gtest/gtest.h"
#include "test_env.h"
#include "apputil.hpp" // Note: declares CreateAddr, but not srt::CreateAddr

#include "srt.h"
#include "logging_api.h"
#include "common.h"
#include "netinet_any.h"

TEST(Bonding, SRTConnectGroup)
Expand Down Expand Up @@ -376,7 +379,7 @@ TEST(Bonding, Options)
#endif
int allow = 1;
ASSERT_NE(srt_setsockflag(lsn, SRTO_GROUPCONNECT, &allow, sizeof allow), SRT_ERROR);
sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET);
sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET);
ASSERT_NE(srt_bind(lsn, sa.get(), sa.size()), SRT_ERROR);
ASSERT_NE(srt_listen(lsn, 1), SRT_ERROR);
started = true;
Expand Down Expand Up @@ -413,7 +416,7 @@ TEST(Bonding, Options)
}

// Now the thread is accepting, so we call the connect.
sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET);
sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET);
SRTSOCKET member = srt_connect(grp, sa.get(), sa.size());

// We've released the mutex and signaled the CV, so accept should proceed now.
Expand Down Expand Up @@ -507,7 +510,7 @@ TEST(Bonding, InitialFailure)
int allow = 1;
ASSERT_NE(srt_setsockflag(lsn, SRTO_GROUPCONNECT, &allow, sizeof allow), SRT_ERROR);

sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET);
sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET);
ASSERT_NE(srt_bind(lsn, sa.get(), sa.size()), SRT_ERROR);
ASSERT_NE(srt_listen(lsn, 5), SRT_ERROR);

Expand Down Expand Up @@ -564,3 +567,125 @@ TEST(Bonding, InitialFailure)
srt_close(lsn);
}

void SetLongSilenceTolerant(const SRTSOCKET s)
{
int longtime = 100000;

srt_setsockflag(s, SRTO_CONNTIMEO, &longtime, sizeof longtime);
srt_setsockflag(s, SRTO_PEERIDLETIMEO, &longtime, sizeof longtime);
}

TEST(Bonding, DeadLinkUpdate)
{
using namespace std;
using namespace std::chrono;

srt::TestInit srtinit;

SRTSOCKET listener = srt_create_socket();
const SRTSOCKET group = srt_create_group(SRT_GTYPE_BACKUP);

SetLongSilenceTolerant(listener);
SetLongSilenceTolerant(group);

srt::sockaddr_any sa(AF_INET);

inet_pton(AF_INET, "127.0.0.1", sa.get_addr());

sa.hport(5555);

srt_bind(listener, sa.get(), sa.size());
srt::setopt(listener)[SRTO_GROUPCONNECT] = 1;
srt_listen(listener, 1);
char srcbuf [] = "1234ABCD";

thread td = thread([&]() {
cout << "[T] Connecting 1...\n";
const SRTSOCKET member1 = srt_connect(group, sa.get(), sa.size());
EXPECT_NE(member1, SRT_INVALID_SOCK);
// Now wait 3s
cout << "[T] Link 1 established. Wait 3s...\n";
this_thread::sleep_for(seconds(3));

cout << "[T] Connecting 2...\n";
// Make a second connection
const SRTSOCKET member2 = srt_connect(group, sa.get(), sa.size());
EXPECT_NE(member2, SRT_INVALID_SOCK);

if (member2 == SRT_INVALID_SOCK || member1 == SRT_INVALID_SOCK)
{
srt_close(member1);
srt_close(member2);
cout << "[T] Test already failed, exitting\n";
return;
}

cout << "[T] Link 2 established. Wait 3s...\n";
// Again wait 3s
this_thread::sleep_for(seconds(3));

cout << "[T] Killing link 1...\n";
// Now close the first connection
srt_close(member1);

// Now send the data and see if they are received
cout << "[T] Sending: size=" << (sizeof srcbuf) << " Content: '" << srcbuf << "'...\n";
int nsent = srt_send(group, srcbuf, sizeof srcbuf);
EXPECT_NE(nsent, -1) << "srt_send:" << srt_getlasterror_str();

cout << "[T] Wait 3s...\n";
// Again wait 3s
this_thread::sleep_for(seconds(3));

cout << "[T] Killing the group and exitting.\n";
// And close
srt_close(group);

cout << "[T] exit\n";
});

cout << "Accepting (10s timeout)...\n";

SRTSOCKET lsnra [] = { listener };

// Using srt_accept_bond to apply accept timeout
const SRTSOCKET acp = srt_accept_bond(lsnra, 1, 10*1000);

EXPECT_NE(acp, -1) << "srt_accept:" << srt_getlasterror_str();
EXPECT_EQ(acp & SRTGROUP_MASK, SRTGROUP_MASK);

// Close and set up the listener again.
srt_close(listener);
if (acp != SRT_ERROR)
{
listener = srt_create_socket();
srt_bind(listener, sa.get(), sa.size());
srt::setopt(listener)[SRTO_GROUPCONNECT] = 1;
srt_listen(listener, 1);

cout << "Group accepted. Receiving...\n";
char buf[1316] = "";
const int nrecv = srt_recv(acp, buf, 1316);
int syserr, err;
err = srt_getlasterror(&syserr);
EXPECT_NE(nrecv, -1) << "srt_recv:" << srt_getlasterror_str();

cout << "Received: val=" << nrecv << " Content: '" << buf << "'\n";
if (nrecv == -1)
{
cout << "ERROR: " << srt_strerror(err, syserr) << endl;
cout << "STATUS: " << srt_logging::SockStatusStr(srt_getsockstate(acp)) << endl;
}
else
{
EXPECT_EQ(strcmp(srcbuf, buf), 0);
}

cout << "Closing.\n";
srt_close(acp);
srt_close(listener);
}

td.join();
}

0 comments on commit 90c2888

Please sign in to comment.