Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[apps] Adding alternative group connect callback for testing #1651

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions testing/srt-test-live.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ int main( int argc, char** argv )
o_hook ((optargs), "<hookspec> Use listener callback of given specification (internally coded)", "hook"),
#if ENABLE_EXPERIMENTAL_BONDING
o_group ((optargs), "<URIs...> Using multiple SRT connections as redundancy group", "g"),
o_mxptool ((optargs), " Use mxptool style to reconnect failed group nodes", "mxp", "mxptool"),
#endif
o_stime ((optargs), " Pass source time explicitly to SRT output", "st", "srctime", "sourcetime"),
o_retry ((optargs), "<N=-1,0,+N> Retry connection N times if failed on timeout", "rc", "retry"),
Expand Down Expand Up @@ -659,6 +660,10 @@ int main( int argc, char** argv )
bool internal_log = OptionPresent(params, o_logint);
bool skip_flushing = OptionPresent(params, o_skipflush);

#if ENABLE_EXPERIMENTAL_BONDING
transmit_groupreconn_mxptool = OptionPresent(params, o_mxptool);
#endif

string hook = Option<OutString>(params, "", o_hook);
if (hook != "")
{
Expand Down
188 changes: 187 additions & 1 deletion testing/testmedia.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ bool transmit_printformat_json = false;
srt_listen_callback_fn* transmit_accept_hook_fn = nullptr;
void* transmit_accept_hook_op = nullptr;
bool transmit_use_sourcetime = false;
bool transmit_groupreconn_mxptool = false;
int transmit_retry_connect = 0;
bool transmit_retry_always = false;

Expand Down Expand Up @@ -890,6 +891,188 @@ void SrtCommon::PrepareClient()
}

#if ENABLE_EXPERIMENTAL_BONDING
#if 0
void SrtConn_ConnectCB(void *opaq, SRTSOCKET ns, int errorcode, const struct sockaddr *psaPeer, int token)
{
char szEndPoint[SRTC_ENDPOINTLEN];
SrtConn *srtc = (SrtConn *)opaq;
int iLID = token-1;
int iRejectReason = SRT_REJ_UNKNOWN;
srtcLog(srtc, LOG_DEBUG, "connectCB(%p,%x,%d,%s,%d)\n",
opaq, ns, errorcode,
srtconn_StrEndPoint(srtc, psaPeer, szEndPoint, sizeof(szEndPoint), NULL),
token
#if 0 //SIGABORT Haivision/srt issue #1585
,errorcode ? srt_strerror(errorcode,0): ""
#endif
);
if(srtc->cfg.group.type) {
if (token < 1 || token > SRTC_GROUP_MAXLINKS){
srtcLog(srtc, LOG_ERR, "sock[%x] %s invalid token %d vs. [1:%d]\n",ns,
srtconn_StrEndPoint(srtc, psaPeer, szEndPoint, sizeof(szEndPoint), NULL),
token, SRTC_GROUP_MAXLINKS);
return;
}
}else{
return; //ignore non-group calls (token == -1)
}
switch(errorcode) {
case SRT_SUCCESS:
srtcLog(srtc, LOG_NOTICE, "Link established %s: %s\n",
SRTC_CONNMODE_SERVER == srtc->rt.connmode ? "from" : "to",
srtconn_StrEndPoint(srtc, psaPeer, szEndPoint, sizeof(szEndPoint), NULL));
srtc->rt.group_data[iLID].id = ns;
break;
case SRT_ENOSERVER:
case SRT_ECONNREJ:
//other cases? >>FIXME
iRejectReason = srt_getrejectreason(ns);
//no break, fall though
default:
{
int uerrc;
if (iRejectReason != SRT_REJ_UNKNOWN) {
srtcLog(srtc, LOG_NOTICE, "Link %d with %s failed: err=%d(%s),reason=%d(%s)\n", token,
srtconn_StrEndPoint(srtc, psaPeer, szEndPoint, sizeof(szEndPoint), NULL),
errorcode, srt_strerror(errorcode,0),
iRejectReason, srt_rejectreason_str(iRejectReason));
}else
{
srtcLog(srtc, LOG_NOTICE, "Link %d with %s failed: err=%d(%s)\n", token,
srtconn_StrEndPoint(srtc, psaPeer, szEndPoint, sizeof(szEndPoint), NULL),
errorcode, srt_strerror(errorcode,0));
}
//invalidate link socket to prevent get stats errlog
srtc->rt.group_data[iLID].id = SRT_INVALID_SOCK;
if (srtc->flags & MXPC_F_RECONNECT) {
SRT_SOCKGROUPCONFIG ssgcLink[1];
char szEndPoint[SRTC_ENDPOINTLEN];
//grab link config and reconnect
#if 0 // SRT_HAS_ISSUE1603
/* >>work around github/srt issue #1603
broken socket are deleted after being reported by srt_sendmsg2() and srt_recvmsg2()
and accumulate if no packet transit, which is the case before initial connection following
multiple failed connection attempts.
*/
if( SRT_ERROR == srt_close(ns)){
srtcLog(srtc, LOG_ERR, "link %d str_close(%x) failed %s\n",iLID+1,ns,
srt_getlasterror_str());
}else{
srtcLog(srtc, LOG_DEBUG, "link %d str_close(%x) succeeded\n",iLID+1, ns);
}
#endif
/* Get link initial configuration to reconnect */
memcpy(&ssgcLink[0], &srtc->cfg.group.ssgcLink[iLID], sizeof(ssgcLink[0]));
srtcLog(srtc, LOG_DEBUG, "str_connect_group for link %d: %s w:%hu\n", iLID+1,
srtconn_StrEndPoint(srtc, (struct sockaddr *)&ssgcLink[0].peeraddr, szEndPoint, sizeof(szEndPoint), NULL),
ssgcLink[0].weight);
if (SRT_ERROR == srt_connect_group(srtc->sock.u, ssgcLink, 1)) {
uerrc = srt_getlasterror(NULL);
switch(uerrc) {
case EINPROGRESS:
srtcLog(srtc, LOG_DEBUG, "EINPROGRESS:connecting group link %d: %s w:%hu\n", iLID+1,
srtconn_StrEndPoint(srtc, (struct sockaddr *)&ssgcLink[0].peeraddr, szEndPoint, sizeof(szEndPoint), NULL),
ssgcLink[0].weight);
break;
case SRT_ENOSERVER:
case SRT_ECONNREJ:
{
int rejreason = srt_getrejectreason(srtc->sock.u);
srtcLog(srtc, LOG_ERR, "could not connect, %s: %s\n",
uerrc == SRT_ENOSERVER ? "timed out" : "rejected",
srt_rejectreason_str(rejreason));
}
break;
default:
srtcLog(srtc, LOG_ERR, "could not connect: %s\n", srt_getlasterror_str());
break;
}
} else {
//>> Unlike srt_connect() SRT_SUCCESS(0) is returned instead of EINPROGRESS for non-blocking mode
srtcLog(srtc, LOG_DEBUG, "0:reconnecting group link %d: %s w:%hu\n", iLID+1,
srtconn_StrEndPoint(srtc, (struct sockaddr *)&ssgcLink[0].peeraddr, szEndPoint, sizeof(szEndPoint), NULL),
ssgcLink[0].weight);
}
}
}
break;
}
return;
}
#endif

void TransmitMxptoolConnectCallback(void *srtcommon, SRTSOCKET ns, int errorcode, const struct sockaddr *psaPeer, int token)
{
extern srt_logging::Logger applog;
SrtCommon* that = (SrtCommon*)srtcommon;
sockaddr_any peer(psaPeer);
applog.Debug("TransmitMxptoolConnectCallback: @", ns, " code=", errorcode, " adr=", peer.str(), " token=", token);

int reject_reason = SRT_REJ_UNKNOWN;

SrtCommon::Connection* this_node = nullptr;
if (!that->m_group_nodes.empty())
{
// Find the token, or report invalid
for (auto& n: that->m_group_nodes)
{
if (n.token == -1)
{
applog.Debug("TransmitMxptoolConnectCallback: NODE WITH NO TOKEN: ", n.host, ":", n.port);
continue;
}

if (n.token == token)
{
applog.Debug("TransmitMxptoolConnectCallback: FOUND NODE by: ", n.host, ":", n.port);
this_node = &n;
break;
}
}
}

if (this_node == nullptr)
{
applog.Error("TransmitMxptoolConnectCallback: INVALID TOKEN: ", token);
return;
}

if (errorcode == SRT_SUCCESS)
return; // connection successful - nothing to be done

if (errorcode == SRT_ENOSERVER || errorcode == SRT_ECONNREJ)
reject_reason = srt_getrejectreason(ns);

// don't overwrite so that the main reconnector remains unaware
//this_node->socket = SRT_INVALID_SOCK;

const sockaddr* source = this_node->source.empty() ? nullptr : this_node->source.get();
SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(source, this_node->target.get(), this_node->target.size());
gd.weight = this_node->weight;
gd.config = this_node->options;
gd.token = this_node->token;

applog.Debug("TransmitMxptoolConnectCallback: connection ", reject_reason == SRT_REJ_UNKNOWN ? " failed. " : "rejected: ",
srt_rejectreason_str(reject_reason));

applog.Debug("TransmitMxptoolConnectCallback: will reconnect: [",
sockaddr_any(gd.srcaddr).str(), "] -> ", sockaddr_any(gd.peeraddr).str(),
" weight=", gd.weight);

int st = srt_connect_group(that->m_sock, &gd, 1);
if (st == SRT_ERROR)
{
int errc = srt_getlasterror(nullptr);
applog.Error("TransmitMxptoolConnectCallback: ERROR code=", errc);
}
else
{
applog.Debug("TransmitMxptoolConnectCallback: Reconnection started successfully");
this_node->socket = st;
}
}


void TransmitGroupSocketConnect(void* srtcommon, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token)
{
SrtCommon* that = (SrtCommon*)srtcommon;
Expand Down Expand Up @@ -954,7 +1137,10 @@ void SrtCommon::OpenGroupClient()
if (m_sock == -1)
Error("srt_create_group");

srt_connect_callback(m_sock, &TransmitGroupSocketConnect, this);
if (transmit_groupreconn_mxptool)
srt_connect_callback(m_sock, &TransmitMxptoolConnectCallback, this);
else
srt_connect_callback(m_sock, &TransmitGroupSocketConnect, this);

int stat = -1;
if (m_group_config != "")
Expand Down
2 changes: 2 additions & 0 deletions testing/testmedia.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

extern srt_listen_callback_fn* transmit_accept_hook_fn;
extern void* transmit_accept_hook_op;
extern bool transmit_groupreconn_mxptool;

extern std::shared_ptr<SrtStatsWriter> transmit_stats_writer;

Expand Down Expand Up @@ -50,6 +51,7 @@ class SrtCommon
protected:

friend void TransmitGroupSocketConnect(void* srtcommon, SRTSOCKET sock, int error, const sockaddr* peer, int token);
friend void TransmitMxptoolConnectCallback(void *srtcommon, SRTSOCKET ns, int errorcode, const struct sockaddr *psaPeer, int token);

struct ConnectionBase
{
Expand Down