diff --git a/srtcore/api.cpp b/srtcore/api.cpp index bb5dd64fe..c64fe1aa3 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1989,6 +1989,42 @@ void srt::CUDTUnited::deleteGroup_LOCKED(CUDTGroup* g) int srt::CUDTUnited::close(CUDTSocket* s) { HLOGC(smlog.Debug, log << s->core().CONID() << "CLOSE. Acquiring control lock"); + // The check for whether m_pRcvQueue isn't NULL is safe enough; + // it can either be NULL after socket creation and without binding + // and then once it's assigned, it's never reset to NULL even when + // destroying the socket. + CUDT& e = s->core(); + if (e.m_pRcvQueue && e.m_bConnecting && !e.m_bConnected) + { + // Workaround for a design flaw. + // It's to work around the case when the socket is being + // closed in another thread while it's in the process of + // connecting in the blocking mode, that is, it runs the + // loop in `CUDT::startConnect` whole time under the lock + // of CUDT::m_ConnectionLock and CUDTSocket::m_ControlLock + // this way blocking the `srt_close` API call from continuing. + // We are setting here the m_bClosing flag prematurely so + // that the loop may check this flag periodically and exit + // immediately if it's set. + // + // The problem is that this flag shall NOT be set in case + // when you have a CONNECTED socket because not only isn't it + // not a problem in this case, but also it additionally + // turns the socket in a "confused" state in which it skips + // vital part of closing itself and therefore runs an infinite + // loop when trying to purge the sender buffer of the closing + // socket. + // + // XXX Consider refax on CUDT::startConnect and removing the + // connecting loop there and replace the "blocking mode specific" + // connecting procedure with delegation to the receiver queue, + // which will be then common with non-blocking mode, and synchronize + // the blocking through a CV. + + e.m_bClosing = true; + e.m_pRcvQueue->kick(); + } + ScopedLock socket_cg(s->m_ControlLock); HLOGC(smlog.Debug, log << s->core().CONID() << "CLOSING (removing from listening, closing CUDT)"); diff --git a/srtcore/api.h b/srtcore/api.h index b5d6be915..48e7827f8 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -486,7 +486,11 @@ class CUDTUnited bool acquire(CUDTUnited& glob, CUDTSocket* s) { if (s == NULL) + { + socket = NULL; return false; + } + const bool caught = glob.acquireSocket(s); socket = caught ? s : NULL; return caught; diff --git a/srtcore/core.cpp b/srtcore/core.cpp index eca2b2069..7c0990954 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -3653,7 +3653,7 @@ void srt::CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn) // We can't record this address yet until the cookie-confirmation is done, for safety reasons. sockaddr_any use_source_adr(serv_addr.family()); - while (!m_bClosing) + while (!m_bClosing && !m_bBroken) { const steady_clock::time_point local_tnow = steady_clock::now(); const steady_clock::duration tdiff = local_tnow - m_tsLastReqTime.load(); @@ -6862,7 +6862,7 @@ int srt::CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl) << " DATA SIZE: " << size << " sched-SEQUENCE: " << seqno << " STAMP: " << BufferStamp(data, size)); - if (w_mctrl.srctime && w_mctrl.srctime < count_microseconds(m_stats.tsStartTime.time_since_epoch())) + if (w_mctrl.srctime && w_mctrl.srctime < count_microseconds(m_stats.tsStartTime.load().time_since_epoch())) { LOGC(aslog.Error, log << CONID() << "Wrong source time was provided. Sending is rejected."); @@ -7499,7 +7499,7 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous) const steady_clock::time_point currtime = steady_clock::now(); - perf->msTimeStamp = count_milliseconds(currtime - m_stats.tsStartTime); + perf->msTimeStamp = count_milliseconds(currtime - m_stats.tsStartTime.load()); perf->pktSent = m_stats.sndr.sent.trace.count(); perf->pktSentUnique = m_stats.sndr.sentUnique.trace.count(); perf->pktRecv = m_stats.rcvr.recvd.trace.count(); @@ -9594,7 +9594,7 @@ bool srt::CUDT::isRetransmissionAllowed(const time_point& tnow SRT_ATR_UNUSED) const int msNextUniqueToSend = count_milliseconds(tnow - tsNextPacket) + m_iPeerTsbPdDelay_ms; g_snd_logger.state.tsNow = tnow; - g_snd_logger.state.usElapsed = count_microseconds(tnow - m_stats.tsStartTime); + g_snd_logger.state.usElapsed = count_microseconds(tnow - m_stats.tsStartTime.load()); g_snd_logger.state.usSRTT = m_iSRTT; g_snd_logger.state.usRTTVar = m_iRTTVar; g_snd_logger.state.msSndBuffSpan = buffdelay_ms; @@ -10983,7 +10983,7 @@ int32_t srt::CUDT::bake(const sockaddr_any& addr, int32_t current_cookie, int co clientport, sizeof(clientport), NI_NUMERICHOST | NI_NUMERICSERV); - int64_t timestamp = (count_microseconds(steady_clock::now() - m_stats.tsStartTime) / 60000000) + distractor + + int64_t timestamp = (count_microseconds(steady_clock::now() - m_stats.tsStartTime.load()) / 60000000) + distractor + correction; // secret changes every one minute stringstream cookiestr; cookiestr << clienthost << ":" << clientport << ":" << timestamp; @@ -11869,7 +11869,7 @@ int64_t srt::CUDT::socketStartTime(SRTSOCKET u) if (!s) return APIError(MJ_NOTSUP, MN_SIDINVAL); - return count_microseconds(s->core().m_stats.tsStartTime.time_since_epoch()); + return count_microseconds(s->core().m_stats.tsStartTime.load().time_since_epoch()); } bool srt::CUDT::runAcceptHook(CUDT *acore, const sockaddr* peer, const CHandShake& hs, const CPacket& hspkt) diff --git a/srtcore/core.h b/srtcore/core.h index ed250c641..1c5b04008 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -1184,7 +1184,7 @@ class CUDT private: // Trace struct CoreStats { - time_point tsStartTime; // timestamp when the UDT entity is started + atomic_time_point tsStartTime; // timestamp when the UDT entity is started stats::Sender sndr; // sender statistics stats::Receiver rcvr; // receiver statistics diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 6cb4faeb1..729fc2293 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -1790,6 +1790,11 @@ srt::CUDT* srt::CRcvQueue::getNewEntry() return u; } +void srt::CRcvQueue::kick() +{ + CSync::lock_notify_all(m_BufferCond, m_BufferLock); +} + void srt::CRcvQueue::storePktClone(int32_t id, const CPacket& pkt) { CUniqueSync passcond(m_BufferLock, m_BufferCond); diff --git a/srtcore/queue.h b/srtcore/queue.h index 132b670b3..6c8dfdf32 100644 --- a/srtcore/queue.h +++ b/srtcore/queue.h @@ -553,6 +553,8 @@ class CRcvQueue void storePktClone(int32_t id, const CPacket& pkt); + void kick(); + private: sync::CSharedObjectPtr m_pListener; // pointer to the (unique, if any) listening UDT entity CRendezvousQueue* m_pRendezvousQueue; // The list of sockets in rendezvous mode diff --git a/test/test_common.cpp b/test/test_common.cpp index 1a8cca061..e705d3e82 100644 --- a/test/test_common.cpp +++ b/test/test_common.cpp @@ -1,6 +1,12 @@ #include #include +#include + +#include +#include +#include + #include "gtest/gtest.h" #include "test_env.h" #include "utilities.h" @@ -71,3 +77,79 @@ TEST(CIPAddress, IPv4_in_IPv6_pton) test_cipaddress_pton(peer_ip, AF_INET6, ip); } + + +TEST(SRTAPI, SyncRendezvousHangs) +{ + srt::TestInit srtinit; + int yes = 1; + + SRTSOCKET m_bindsock = srt_create_socket(); + ASSERT_NE(m_bindsock, SRT_ERROR); + + ASSERT_NE(srt_setsockopt(m_bindsock, 0, SRTO_TSBPDMODE, &yes, sizeof yes), SRT_ERROR); + ASSERT_NE(srt_setsockflag(m_bindsock, SRTO_SENDER, &yes, sizeof yes), SRT_ERROR); + ASSERT_EQ(srt_setsockopt(m_bindsock, 0, SRTO_RENDEZVOUS, &yes, sizeof yes), 0); + + const int connection_timeout_ms = 1000; // rendezvous timeout is x10 hence 10seconds + ASSERT_EQ(srt_setsockopt(m_bindsock, 0, SRTO_CONNTIMEO, &connection_timeout_ms, sizeof connection_timeout_ms), 0); + + sockaddr_in local_sa={}; + local_sa.sin_family = AF_INET; + local_sa.sin_port = htons(9999); + local_sa.sin_addr.s_addr = INADDR_ANY; + + sockaddr_in peer_sa= {}; + peer_sa.sin_family = AF_INET; + peer_sa.sin_port = htons(9998); + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &peer_sa.sin_addr), 1); + + uint64_t duration = 0; + + std::thread close_thread([&m_bindsock, &duration] { + std::this_thread::sleep_for(std::chrono::seconds(1)); // wait till srt_rendezvous is called + auto start = std::chrono::steady_clock::now(); + srt_close(m_bindsock); + auto end = std::chrono::steady_clock::now(); + + duration = std::chrono::duration_cast(end - start).count(); + }); + + EXPECT_EQ(srt_rendezvous(m_bindsock, (sockaddr*)&local_sa, sizeof local_sa, + (sockaddr*)&peer_sa, sizeof peer_sa), SRT_ERROR); + + close_thread.join(); + ASSERT_LE(duration, 1); +} + +TEST(SRTAPI, RapidClose) +{ + ASSERT_EQ(srt_startup(), 0); + + SRTSOCKET sock = srt_create_socket(); + std::condition_variable cv_start; + std::mutex cvm; + sync::atomic started(false), ended(false); + + std::thread connect_thread([&sock, &cv_start, &started, &ended] { + started = true; + cv_start.notify_one(); + + // Nonexistent address + sockaddr_any sa = CreateAddr("localhost", 5555, AF_INET); + srt_connect(sock, sa.get(), sa.size()); + // It doesn't matter if it succeeds. Important is that it exits. + ended = true; + }); + + std::unique_lock lk(cvm); + + // Wait until the thread surely starts + while (!started) + cv_start.wait(lk); + + srt_close(sock); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + EXPECT_TRUE(ended); + connect_thread.join(); +} diff --git a/test/test_file_transmission.cpp b/test/test_file_transmission.cpp index bfd668ac7..61d3d6a20 100644 --- a/test/test_file_transmission.cpp +++ b/test/test_file_transmission.cpp @@ -18,6 +18,7 @@ #endif #include "srt.h" +#include "threadname.h" #include #include @@ -99,6 +100,7 @@ TEST(Transmission, FileUpload) auto client = std::thread([&] { + srt::ThreadName::set("TEST-in"); sockaddr_in remote; int len = sizeof remote; const SRTSOCKET accepted_sock = srt_accept(sock_lsn, (sockaddr*)&remote, &len);