diff --git a/testing/srt-test-live.cpp b/testing/srt-test-live.cpp index daa3efc29..fff398d76 100644 --- a/testing/srt-test-live.cpp +++ b/testing/srt-test-live.cpp @@ -449,6 +449,7 @@ int main( int argc, char** argv ) o_hook ((optargs), " Use listener callback of given specification (internally coded)", "hook"), #if ENABLE_EXPERIMENTAL_BONDING o_group ((optargs), " Using multiple SRT connections as redundancy group", "g"), + o_mxptool ((optargs), " Use mxptool style to reconnect failed group nodes", "mxp", "mxptool"), #endif o_stime ((optargs), " Pass source time explicitly to SRT output", "st", "srctime", "sourcetime"), o_retry ((optargs), " Retry connection N times if failed on timeout", "rc", "retry"), @@ -659,6 +660,10 @@ int main( int argc, char** argv ) bool internal_log = OptionPresent(params, o_logint); bool skip_flushing = OptionPresent(params, o_skipflush); +#if ENABLE_EXPERIMENTAL_BONDING + transmit_groupreconn_mxptool = OptionPresent(params, o_mxptool); +#endif + string hook = Option(params, "", o_hook); if (hook != "") { diff --git a/testing/testmedia.cpp b/testing/testmedia.cpp index 9324733b1..1334137f7 100755 --- a/testing/testmedia.cpp +++ b/testing/testmedia.cpp @@ -56,6 +56,7 @@ bool transmit_printformat_json = false; srt_listen_callback_fn* transmit_accept_hook_fn = nullptr; void* transmit_accept_hook_op = nullptr; bool transmit_use_sourcetime = false; +bool transmit_groupreconn_mxptool = false; int transmit_retry_connect = 0; bool transmit_retry_always = false; @@ -890,6 +891,188 @@ void SrtCommon::PrepareClient() } #if ENABLE_EXPERIMENTAL_BONDING +#if 0 +void SrtConn_ConnectCB(void *opaq, SRTSOCKET ns, int errorcode, const struct sockaddr *psaPeer, int token) +{ + char szEndPoint[SRTC_ENDPOINTLEN]; + SrtConn *srtc = (SrtConn *)opaq; + int iLID = token-1; + int iRejectReason = SRT_REJ_UNKNOWN; + srtcLog(srtc, LOG_DEBUG, "connectCB(%p,%x,%d,%s,%d)\n", + opaq, ns, errorcode, + srtconn_StrEndPoint(srtc, psaPeer, szEndPoint, sizeof(szEndPoint), NULL), + token +#if 0 //SIGABORT Haivision/srt issue #1585 + ,errorcode ? srt_strerror(errorcode,0): "" +#endif + ); + if(srtc->cfg.group.type) { + if (token < 1 || token > SRTC_GROUP_MAXLINKS){ + srtcLog(srtc, LOG_ERR, "sock[%x] %s invalid token %d vs. [1:%d]\n",ns, + srtconn_StrEndPoint(srtc, psaPeer, szEndPoint, sizeof(szEndPoint), NULL), + token, SRTC_GROUP_MAXLINKS); + return; + } + }else{ + return; //ignore non-group calls (token == -1) + } + switch(errorcode) { + case SRT_SUCCESS: + srtcLog(srtc, LOG_NOTICE, "Link established %s: %s\n", + SRTC_CONNMODE_SERVER == srtc->rt.connmode ? "from" : "to", + srtconn_StrEndPoint(srtc, psaPeer, szEndPoint, sizeof(szEndPoint), NULL)); + srtc->rt.group_data[iLID].id = ns; + break; + case SRT_ENOSERVER: + case SRT_ECONNREJ: + //other cases? >>FIXME + iRejectReason = srt_getrejectreason(ns); + //no break, fall though + default: + { + int uerrc; + if (iRejectReason != SRT_REJ_UNKNOWN) { + srtcLog(srtc, LOG_NOTICE, "Link %d with %s failed: err=%d(%s),reason=%d(%s)\n", token, + srtconn_StrEndPoint(srtc, psaPeer, szEndPoint, sizeof(szEndPoint), NULL), + errorcode, srt_strerror(errorcode,0), + iRejectReason, srt_rejectreason_str(iRejectReason)); + }else + { + srtcLog(srtc, LOG_NOTICE, "Link %d with %s failed: err=%d(%s)\n", token, + srtconn_StrEndPoint(srtc, psaPeer, szEndPoint, sizeof(szEndPoint), NULL), + errorcode, srt_strerror(errorcode,0)); + } + //invalidate link socket to prevent get stats errlog + srtc->rt.group_data[iLID].id = SRT_INVALID_SOCK; + if (srtc->flags & MXPC_F_RECONNECT) { + SRT_SOCKGROUPCONFIG ssgcLink[1]; + char szEndPoint[SRTC_ENDPOINTLEN]; + //grab link config and reconnect +#if 0 // SRT_HAS_ISSUE1603 + /* >>work around github/srt issue #1603 + broken socket are deleted after being reported by srt_sendmsg2() and srt_recvmsg2() + and accumulate if no packet transit, which is the case before initial connection following + multiple failed connection attempts. + */ + if( SRT_ERROR == srt_close(ns)){ + srtcLog(srtc, LOG_ERR, "link %d str_close(%x) failed %s\n",iLID+1,ns, + srt_getlasterror_str()); + }else{ + srtcLog(srtc, LOG_DEBUG, "link %d str_close(%x) succeeded\n",iLID+1, ns); + } +#endif + /* Get link initial configuration to reconnect */ + memcpy(&ssgcLink[0], &srtc->cfg.group.ssgcLink[iLID], sizeof(ssgcLink[0])); + srtcLog(srtc, LOG_DEBUG, "str_connect_group for link %d: %s w:%hu\n", iLID+1, + srtconn_StrEndPoint(srtc, (struct sockaddr *)&ssgcLink[0].peeraddr, szEndPoint, sizeof(szEndPoint), NULL), + ssgcLink[0].weight); + if (SRT_ERROR == srt_connect_group(srtc->sock.u, ssgcLink, 1)) { + uerrc = srt_getlasterror(NULL); + switch(uerrc) { + case EINPROGRESS: + srtcLog(srtc, LOG_DEBUG, "EINPROGRESS:connecting group link %d: %s w:%hu\n", iLID+1, + srtconn_StrEndPoint(srtc, (struct sockaddr *)&ssgcLink[0].peeraddr, szEndPoint, sizeof(szEndPoint), NULL), + ssgcLink[0].weight); + break; + case SRT_ENOSERVER: + case SRT_ECONNREJ: + { + int rejreason = srt_getrejectreason(srtc->sock.u); + srtcLog(srtc, LOG_ERR, "could not connect, %s: %s\n", + uerrc == SRT_ENOSERVER ? "timed out" : "rejected", + srt_rejectreason_str(rejreason)); + } + break; + default: + srtcLog(srtc, LOG_ERR, "could not connect: %s\n", srt_getlasterror_str()); + break; + } + } else { + //>> Unlike srt_connect() SRT_SUCCESS(0) is returned instead of EINPROGRESS for non-blocking mode + srtcLog(srtc, LOG_DEBUG, "0:reconnecting group link %d: %s w:%hu\n", iLID+1, + srtconn_StrEndPoint(srtc, (struct sockaddr *)&ssgcLink[0].peeraddr, szEndPoint, sizeof(szEndPoint), NULL), + ssgcLink[0].weight); + } + } + } + break; + } + return; +} +#endif + +void TransmitMxptoolConnectCallback(void *srtcommon, SRTSOCKET ns, int errorcode, const struct sockaddr *psaPeer, int token) +{ + extern srt_logging::Logger applog; + SrtCommon* that = (SrtCommon*)srtcommon; + sockaddr_any peer(psaPeer); + applog.Debug("TransmitMxptoolConnectCallback: @", ns, " code=", errorcode, " adr=", peer.str(), " token=", token); + + int reject_reason = SRT_REJ_UNKNOWN; + + SrtCommon::Connection* this_node = nullptr; + if (!that->m_group_nodes.empty()) + { + // Find the token, or report invalid + for (auto& n: that->m_group_nodes) + { + if (n.token == -1) + { + applog.Debug("TransmitMxptoolConnectCallback: NODE WITH NO TOKEN: ", n.host, ":", n.port); + continue; + } + + if (n.token == token) + { + applog.Debug("TransmitMxptoolConnectCallback: FOUND NODE by: ", n.host, ":", n.port); + this_node = &n; + break; + } + } + } + + if (this_node == nullptr) + { + applog.Error("TransmitMxptoolConnectCallback: INVALID TOKEN: ", token); + return; + } + + if (errorcode == SRT_SUCCESS) + return; // connection successful - nothing to be done + + if (errorcode == SRT_ENOSERVER || errorcode == SRT_ECONNREJ) + reject_reason = srt_getrejectreason(ns); + + // don't overwrite so that the main reconnector remains unaware + //this_node->socket = SRT_INVALID_SOCK; + + const sockaddr* source = this_node->source.empty() ? nullptr : this_node->source.get(); + SRT_SOCKGROUPCONFIG gd = srt_prepare_endpoint(source, this_node->target.get(), this_node->target.size()); + gd.weight = this_node->weight; + gd.config = this_node->options; + gd.token = this_node->token; + + applog.Debug("TransmitMxptoolConnectCallback: connection ", reject_reason == SRT_REJ_UNKNOWN ? " failed. " : "rejected: ", + srt_rejectreason_str(reject_reason)); + + applog.Debug("TransmitMxptoolConnectCallback: will reconnect: [", + sockaddr_any(gd.srcaddr).str(), "] -> ", sockaddr_any(gd.peeraddr).str(), + " weight=", gd.weight); + + int st = srt_connect_group(that->m_sock, &gd, 1); + if (st == SRT_ERROR) + { + int errc = srt_getlasterror(nullptr); + applog.Error("TransmitMxptoolConnectCallback: ERROR code=", errc); + } + else + { + applog.Debug("TransmitMxptoolConnectCallback: Reconnection started successfully"); + this_node->socket = st; + } +} + + void TransmitGroupSocketConnect(void* srtcommon, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token) { SrtCommon* that = (SrtCommon*)srtcommon; @@ -954,7 +1137,10 @@ void SrtCommon::OpenGroupClient() if (m_sock == -1) Error("srt_create_group"); - srt_connect_callback(m_sock, &TransmitGroupSocketConnect, this); + if (transmit_groupreconn_mxptool) + srt_connect_callback(m_sock, &TransmitMxptoolConnectCallback, this); + else + srt_connect_callback(m_sock, &TransmitGroupSocketConnect, this); int stat = -1; if (m_group_config != "") diff --git a/testing/testmedia.hpp b/testing/testmedia.hpp index ad70f6967..052d95b90 100644 --- a/testing/testmedia.hpp +++ b/testing/testmedia.hpp @@ -22,6 +22,7 @@ extern srt_listen_callback_fn* transmit_accept_hook_fn; extern void* transmit_accept_hook_op; +extern bool transmit_groupreconn_mxptool; extern std::shared_ptr transmit_stats_writer; @@ -50,6 +51,7 @@ class SrtCommon protected: friend void TransmitGroupSocketConnect(void* srtcommon, SRTSOCKET sock, int error, const sockaddr* peer, int token); + friend void TransmitMxptoolConnectCallback(void *srtcommon, SRTSOCKET ns, int errorcode, const struct sockaddr *psaPeer, int token); struct ConnectionBase {