From 3f6e6acb11d9ace119cdbbd45e396390638c2a7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Tue, 1 Jun 2021 17:32:37 +0200 Subject: [PATCH 1/5] [core] Fixed: closing socket should mark and signal so that srt_connect call can exit immediately --- srtcore/api.cpp | 8 ++++++++ srtcore/core.cpp | 6 ++++++ srtcore/queue.cpp | 5 +++++ srtcore/queue.h | 2 ++ 4 files changed, 21 insertions(+) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index aee13f389..ab57fa42b 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1964,6 +1964,14 @@ int srt::CUDTUnited::close(CUDTSocket* s) { HLOGC(smlog.Debug, log << s->m_pUDT->CONID() << " CLOSE. Acquiring control lock"); + // This socket might be currently during reading from + // the receiver queue as called from `srt_connect` API. + // Until this procedure breaks, locking s->m_ControlLock + // would have to wait. Mark it closing right now and force + // the receiver queue to stop waiting immediately. + s->m_pUDT->m_bClosing = true; + s->m_pUDT->m_pRcvQueue->kick(); + ScopedLock socket_cg(s->m_ControlLock); HLOGC(smlog.Debug, log << s->m_pUDT->CONID() << " CLOSING (removing from listening, closing CUDT)"); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 7a337cf41..e42a276f3 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -3659,6 +3659,12 @@ void srt::CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn) // listener should respond with HS_VERSION_SRT1, if it is HSv5 capable. } + // The queue could have been kicked by the close() API call, + // if so, interrupt immediately. + if (m_bClosing || m_bBroken) + break; + + HLOGC(cnlog.Debug, log << "startConnect: timeout from Q:recvfrom, looping again; cst=" << ConnectStatusStr(cst)); diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index b9d7ed02e..fa135619e 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -1758,6 +1758,11 @@ srt::CUDT* srt::CRcvQueue::getNewEntry() return u; } +void srt::CRcvQueue::kick() +{ + CSync::lock_broadcast(m_BufferCond, m_BufferLock); +} + void srt::CRcvQueue::storePkt(int32_t id, CPacket* pkt) { UniqueLock bufferlock(m_BufferLock); diff --git a/srtcore/queue.h b/srtcore/queue.h index ee05440c8..a4460ba5d 100644 --- a/srtcore/queue.h +++ b/srtcore/queue.h @@ -566,6 +566,8 @@ class CRcvQueue void storePkt(int32_t id, CPacket* pkt); + void kick(); + private: sync::Mutex m_LSLock; CUDT* m_pListener; // pointer to the (unique, if any) listening UDT entity From 7164030f4f2f4ff4e5a9adf5f5ac5853a8b13b44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 2 Jun 2021 11:14:28 +0200 Subject: [PATCH 2/5] Fixed problem: test if socket is in this blocking-connecting state before changing the flag. Otherwise it would confuse the closing function when used on a connected socket --- srtcore/api.cpp | 42 +++++++++++++++++++++++++++------ test/test_file_transmission.cpp | 9 ++++++- 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index ab57fa42b..2b77256b5 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1964,13 +1964,41 @@ int srt::CUDTUnited::close(CUDTSocket* s) { HLOGC(smlog.Debug, log << s->m_pUDT->CONID() << " CLOSE. Acquiring control lock"); - // This socket might be currently during reading from - // the receiver queue as called from `srt_connect` API. - // Until this procedure breaks, locking s->m_ControlLock - // would have to wait. Mark it closing right now and force - // the receiver queue to stop waiting immediately. - s->m_pUDT->m_bClosing = true; - s->m_pUDT->m_pRcvQueue->kick(); + // The check for whether m_pRcvQueue isn't NULL is safe enough; + // it can either be NULL after socket creation and without binding + // and then once it's assigned, it's never reset to NULL even when + // destroying the socket. + CUDT* e = s->m_pUDT; + if (e->m_pRcvQueue && e->m_bConnecting && !e->m_bConnected) + { + // Workaround for a design flaw. + // It's to work around the case when the socket is being + // closed in another thread while it's in the process of + // connecting in the blocking mode, that is, it runs the + // loop in `CUDT::startConnect` whole time under the lock + // of CUDT::m_ConnectionLock and CUDTSocket::m_ControlLock + // this way blocking the `srt_close` API call from continuing. + // We are setting here the m_bClosing flag prematurely so + // that the loop may check this flag periodically and exit + // immediately if it's set. + // + // The problem is that this flag shall NOT be set in case + // when you have a CONNECTED socket because not only isn't it + // not a problem in this case, but also it additionally + // turns the socket in a "confused" state in which it skips + // vital part of closing itself and therefore runs an infinite + // loop when trying to purge the sender buffer of the closing + // socket. + // + // XXX Consider refax on CUDT::startConnect and removing the + // connecting loop there and replace the "blocking mode specific" + // connecting procedure with delegation to the receiver queue, + // which will be then common with non-blocking mode, and synchronize + // the blocking through a CV. + + e->m_bClosing = true; + e->m_pRcvQueue->kick(); + } ScopedLock socket_cg(s->m_ControlLock); diff --git a/test/test_file_transmission.cpp b/test/test_file_transmission.cpp index 790555eb7..21a920741 100644 --- a/test/test_file_transmission.cpp +++ b/test/test_file_transmission.cpp @@ -17,6 +17,7 @@ #endif #include "srt.h" +#include "threadname.h" #include #include @@ -74,6 +75,7 @@ TEST(Transmission, FileUpload) auto client = std::thread([&] { + ThreadName::set("TEST-in"); sockaddr_in remote; int len = sizeof remote; const SRTSOCKET accepted_sock = srt_accept(sock_lsn, (sockaddr*)&remote, &len); @@ -93,7 +95,12 @@ TEST(Transmission, FileUpload) for (;;) { int n = srt_recv(accepted_sock, buf.data(), 1456); - ASSERT_NE(n, SRT_ERROR); + EXPECT_NE(n, SRT_ERROR); + if (n == -1) + { + std::cerr << "UNEXPECTED ERROR: " << srt_getlasterror_str() << std::endl; + break; + } if (n == 0) { break; From cdca85d892a34776e2227d8c3338d780541b631b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 2 Jun 2021 11:25:44 +0200 Subject: [PATCH 3/5] Added UT for the closing case --- test/test_common.cpp | 51 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/test/test_common.cpp b/test/test_common.cpp index a4b8c033a..992b93893 100644 --- a/test/test_common.cpp +++ b/test/test_common.cpp @@ -1,6 +1,11 @@ #include #include +#include + +#include +#include + #include "gtest/gtest.h" #include "utilities.h" #include "common.h" @@ -65,3 +70,49 @@ TEST(CIPAddress, IPv4_in_IPv6_pton) test_cipaddress_pton(peer_ip, AF_INET6, ip); } + + +TEST(SRTAPI, SyncRendezvousHangs) { + ASSERT_EQ(srt_startup(), 0); + + int yes = 1; + + SRTSOCKET m_bindsock = srt_create_socket(); + ASSERT_NE(m_bindsock, SRT_ERROR); + + ASSERT_NE(srt_setsockopt(m_bindsock, 0, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); + ASSERT_NE(srt_setsockflag(m_bindsock, SRTO_SENDER, &yes, sizeof yes), SRT_ERROR); + ASSERT_EQ(srt_setsockopt(m_bindsock, 0, SRTO_RENDEZVOUS, &yes, sizeof yes), 0); + + const int connection_timeout_ms = 1000; // rendezvous timeout is x10 hence 10seconds + ASSERT_EQ(srt_setsockopt(m_bindsock, 0, SRTO_CONNTIMEO, &connection_timeout_ms, sizeof connection_timeout_ms), 0); + + sockaddr_in local_sa={}; + local_sa.sin_family = AF_INET; + local_sa.sin_port = htons(9999); + local_sa.sin_addr.s_addr = INADDR_ANY; + + sockaddr_in peer_sa= {}; + peer_sa.sin_family = AF_INET; + peer_sa.sin_port = htons(9998); + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &peer_sa.sin_addr), 1); + + uint64_t duration = 0; + + std::thread close_thread([&m_bindsock, &duration] { + std::this_thread::sleep_for(std::chrono::seconds(1)); // wait till srt_rendezvous is called + auto start = std::chrono::steady_clock::now(); + srt_close(m_bindsock); + auto end = std::chrono::steady_clock::now(); + + duration = std::chrono::duration_cast(end - start).count(); + }); + + ASSERT_EQ(srt_rendezvous(m_bindsock, (sockaddr*)&local_sa, sizeof local_sa, + (sockaddr*)&peer_sa, sizeof peer_sa), SRT_ERROR); + + close_thread.join(); + ASSERT_LE(duration, 1); + srt_close(m_bindsock); + srt_cleanup(); +} From 50da65233820e851d3f72e9245be7e0cfb25ebf0 Mon Sep 17 00:00:00 2001 From: Mikolaj Malecki Date: Mon, 26 Aug 2024 14:36:07 +0200 Subject: [PATCH 4/5] Fixed unnecessary condition. Added more atomics (data race fix) --- srtcore/api.h | 4 ++++ srtcore/core.cpp | 12 +++--------- srtcore/core.h | 2 +- test/test_common.cpp | 2 +- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/srtcore/api.h b/srtcore/api.h index b5d6be915..48e7827f8 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -486,7 +486,11 @@ class CUDTUnited bool acquire(CUDTUnited& glob, CUDTSocket* s) { if (s == NULL) + { + socket = NULL; return false; + } + const bool caught = glob.acquireSocket(s); socket = caught ? s : NULL; return caught; diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 0e324e5fe..bfdbd94f9 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -3653,7 +3653,7 @@ void srt::CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn) // We can't record this address yet until the cookie-confirmation is done, for safety reasons. sockaddr_any use_source_adr(serv_addr.family()); - while (!m_bClosing) + while (!m_bClosing && !m_bBroken) { const steady_clock::time_point local_tnow = steady_clock::now(); const steady_clock::duration tdiff = local_tnow - m_tsLastReqTime.load(); @@ -3815,12 +3815,6 @@ void srt::CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn) // listener should respond with HS_VERSION_SRT1, if it is HSv5 capable. } - // The queue could have been kicked by the close() API call, - // if so, interrupt immediately. - if (m_bClosing || m_bBroken) - break; - - HLOGC(cnlog.Debug, log << CONID() << "startConnect: timeout from Q:recvfrom, looping again; cst=" << ConnectStatusStr(cst)); @@ -6868,7 +6862,7 @@ int srt::CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl) << " DATA SIZE: " << size << " sched-SEQUENCE: " << seqno << " STAMP: " << BufferStamp(data, size)); - if (w_mctrl.srctime && w_mctrl.srctime < count_microseconds(m_stats.tsStartTime.time_since_epoch())) + if (w_mctrl.srctime && w_mctrl.srctime < count_microseconds(m_stats.tsStartTime.load().time_since_epoch())) { LOGC(aslog.Error, log << CONID() << "Wrong source time was provided. Sending is rejected."); @@ -11875,7 +11869,7 @@ int64_t srt::CUDT::socketStartTime(SRTSOCKET u) if (!s) return APIError(MJ_NOTSUP, MN_SIDINVAL); - return count_microseconds(s->core().m_stats.tsStartTime.time_since_epoch()); + return count_microseconds(s->core().m_stats.tsStartTime.load().time_since_epoch()); } bool srt::CUDT::runAcceptHook(CUDT *acore, const sockaddr* peer, const CHandShake& hs, const CPacket& hspkt) diff --git a/srtcore/core.h b/srtcore/core.h index ed250c641..1c5b04008 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -1184,7 +1184,7 @@ class CUDT private: // Trace struct CoreStats { - time_point tsStartTime; // timestamp when the UDT entity is started + atomic_time_point tsStartTime; // timestamp when the UDT entity is started stats::Sender sndr; // sender statistics stats::Receiver rcvr; // receiver statistics diff --git a/test/test_common.cpp b/test/test_common.cpp index 8d566b359..e705d3e82 100644 --- a/test/test_common.cpp +++ b/test/test_common.cpp @@ -129,7 +129,7 @@ TEST(SRTAPI, RapidClose) SRTSOCKET sock = srt_create_socket(); std::condition_variable cv_start; std::mutex cvm; - bool started = false, ended = false; + sync::atomic started(false), ended(false); std::thread connect_thread([&sock, &cv_start, &started, &ended] { started = true; From 0d78572e79a5d1f76d798d94f7f9376818430661 Mon Sep 17 00:00:00 2001 From: Mikolaj Malecki Date: Mon, 26 Aug 2024 14:52:06 +0200 Subject: [PATCH 5/5] Fixes for atomic field --- srtcore/core.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index bfdbd94f9..7c0990954 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -7499,7 +7499,7 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous) const steady_clock::time_point currtime = steady_clock::now(); - perf->msTimeStamp = count_milliseconds(currtime - m_stats.tsStartTime); + perf->msTimeStamp = count_milliseconds(currtime - m_stats.tsStartTime.load()); perf->pktSent = m_stats.sndr.sent.trace.count(); perf->pktSentUnique = m_stats.sndr.sentUnique.trace.count(); perf->pktRecv = m_stats.rcvr.recvd.trace.count(); @@ -9594,7 +9594,7 @@ bool srt::CUDT::isRetransmissionAllowed(const time_point& tnow SRT_ATR_UNUSED) const int msNextUniqueToSend = count_milliseconds(tnow - tsNextPacket) + m_iPeerTsbPdDelay_ms; g_snd_logger.state.tsNow = tnow; - g_snd_logger.state.usElapsed = count_microseconds(tnow - m_stats.tsStartTime); + g_snd_logger.state.usElapsed = count_microseconds(tnow - m_stats.tsStartTime.load()); g_snd_logger.state.usSRTT = m_iSRTT; g_snd_logger.state.usRTTVar = m_iRTTVar; g_snd_logger.state.msSndBuffSpan = buffdelay_ms; @@ -10983,7 +10983,7 @@ int32_t srt::CUDT::bake(const sockaddr_any& addr, int32_t current_cookie, int co clientport, sizeof(clientport), NI_NUMERICHOST | NI_NUMERICSERV); - int64_t timestamp = (count_microseconds(steady_clock::now() - m_stats.tsStartTime) / 60000000) + distractor + + int64_t timestamp = (count_microseconds(steady_clock::now() - m_stats.tsStartTime.load()) / 60000000) + distractor + correction; // secret changes every one minute stringstream cookiestr; cookiestr << clienthost << ":" << clientport << ":" << timestamp;