From ebe2c7118fe23fcb171170913f59e9e1f0641d9b Mon Sep 17 00:00:00 2001 From: kageds <65413014+kageds@users.noreply.github.com> Date: Wed, 12 Jun 2024 14:37:07 +0100 Subject: [PATCH 1/8] [core] Fix build issues with ENFORCE_SRT_DEBUG_BONDING_STATES (#2948). --- srtcore/group.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 0927d085a..3e57ef5df 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -2574,7 +2574,7 @@ class StabilityTracer str_tnow.replace(str_tnow.find(':'), 1, 1, '_'); } const std::string fname = "stability_trace_" + str_tnow + ".csv"; - m_fout.open(fname, std::ofstream::out); + m_fout.open(fname.c_str(), std::ofstream::out); if (!m_fout) std::cerr << "IPE: Failed to open " << fname << "!!!\n"; From 72303d7934f9c6b1cbe23c438672f0eba0f318cb Mon Sep 17 00:00:00 2001 From: Sektor van Skijlen Date: Wed, 12 Jun 2024 15:55:53 +0200 Subject: [PATCH 2/8] [core] Fixed bug: srt_accept failure may make accepted socket leak (#1884). Added unit test. --- srtcore/api.cpp | 43 ++++++++++++-------- srtcore/api.h | 2 +- test/test_connection_timeout.cpp | 67 ++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 18 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 56c581fec..ca26600d1 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -766,7 +766,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, enterCS(ls->m_AcceptLock); try { - ls->m_QueuedSockets.insert(ns->m_SocketID); + ls->m_QueuedSockets[ns->m_SocketID] = ns->m_PeerAddr; } catch (...) { @@ -1110,8 +1110,22 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int } else if (ls->m_QueuedSockets.size() > 0) { - set::iterator b = ls->m_QueuedSockets.begin(); - u = *b; + map::iterator b = ls->m_QueuedSockets.begin(); + + if (pw_addr != NULL && pw_addrlen != NULL) + { + // Check if the length of the buffer to fill the name in + // was large enough. + const int len = b->second.size(); + if (*pw_addrlen < len) + { + // In case when the address cannot be rewritten, + // DO NOT accept, but leave the socket in the queue. + throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); + } + } + + u = b->first; ls->m_QueuedSockets.erase(b); accepted = true; } @@ -1182,14 +1196,8 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int if (pw_addr != NULL && pw_addrlen != NULL) { - // Check if the length of the buffer to fill the name in - // was large enough. - const int len = s->m_PeerAddr.size(); - if (*pw_addrlen < len) - throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); - - memcpy((pw_addr), &s->m_PeerAddr, len); - *pw_addrlen = len; + memcpy((pw_addr), s->m_PeerAddr.get(), s->m_PeerAddr.size()); + *pw_addrlen = s->m_PeerAddr.size(); } return u; @@ -2751,23 +2759,24 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) // if it is a listener, close all un-accepted sockets in its queue // and remove them later - for (set::iterator q = s->m_QueuedSockets.begin(); q != s->m_QueuedSockets.end(); ++q) + for (map::iterator q = s->m_QueuedSockets.begin(); + q != s->m_QueuedSockets.end(); ++ q) { - sockets_t::iterator si = m_Sockets.find(*q); + sockets_t::iterator si = m_Sockets.find(q->first); if (si == m_Sockets.end()) { // gone in the meantime LOGC(smlog.Error, - log << "removeSocket: IPE? socket @" << (*q) << " being queued for listener socket @" - << s->m_SocketID << " is GONE in the meantime ???"); + log << "removeSocket: IPE? socket @" << (q->first) << " being queued for listener socket @" + << s->m_SocketID << " is GONE in the meantime ???"); continue; } CUDTSocket* as = si->second; as->breakSocket_LOCKED(); - m_ClosedSockets[*q] = as; - m_Sockets.erase(*q); + m_ClosedSockets[q->first] = as; + m_Sockets.erase(q->first); } } diff --git a/srtcore/api.h b/srtcore/api.h index 9ba77d23a..fddbfc294 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -151,7 +151,7 @@ class CUDTSocket CUDT m_UDT; //< internal SRT socket logic public: - std::set m_QueuedSockets; //< set of connections waiting for accept() + std::map m_QueuedSockets; //< set of connections waiting for accept() sync::Condition m_AcceptCond; //< used to block "accept" call sync::Mutex m_AcceptLock; //< mutex associated to m_AcceptCond diff --git a/test/test_connection_timeout.cpp b/test/test_connection_timeout.cpp index 0b8bb7874..dca7595b8 100644 --- a/test/test_connection_timeout.cpp +++ b/test/test_connection_timeout.cpp @@ -1,4 +1,5 @@ #include +#include #include #include "test_env.h" @@ -12,6 +13,7 @@ typedef int SOCKET; #include"platform_sys.h" #include "srt.h" +#include "netinet_any.h" using namespace std; @@ -204,3 +206,68 @@ TEST_F(TestConnectionTimeout, BlockingLoop) } +TEST(TestConnectionAPI, Accept) +{ + using namespace std::chrono; + using namespace srt; + + srt_startup(); + + const SRTSOCKET caller_sock = srt_create_socket(); + const SRTSOCKET listener_sock = srt_create_socket(); + + const int eidl = srt_epoll_create(); + const int eidc = srt_epoll_create(); + const int ev_conn = SRT_EPOLL_OUT | SRT_EPOLL_ERR; + srt_epoll_add_usock(eidc, caller_sock, &ev_conn); + const int ev_acp = SRT_EPOLL_IN | SRT_EPOLL_ERR; + srt_epoll_add_usock(eidl, listener_sock, &ev_acp); + + sockaddr_any sa = srt::CreateAddr("localhost", 5555, AF_INET); + + ASSERT_NE(srt_bind(listener_sock, sa.get(), sa.size()), -1); + ASSERT_NE(srt_listen(listener_sock, 1), -1); + + // Set non-blocking mode so that you can wait for readiness + bool no = false; + srt_setsockflag(caller_sock, SRTO_RCVSYN, &no, sizeof no); + srt_setsockflag(listener_sock, SRTO_RCVSYN, &no, sizeof no); + + srt_connect(caller_sock, sa.get(), sa.size()); + + SRT_EPOLL_EVENT ready[2]; + int nready = srt_epoll_uwait(eidl, ready, 2, 1000); // Wait 1s + EXPECT_EQ(nready, 1); + EXPECT_EQ(ready[0].fd, listener_sock); + // EXPECT_EQ(ready[0].events, SRT_EPOLL_IN); + + // Now call the accept function incorrectly + int size = 0; + sockaddr_storage saf; + + EXPECT_EQ(srt_accept(listener_sock, (sockaddr*)&saf, &size), SRT_ERROR); + + std::this_thread::sleep_for(seconds(1)); + + // Set correctly + size = sizeof (sockaddr_in6); + EXPECT_NE(srt_accept(listener_sock, (sockaddr*)&saf, &size), SRT_ERROR); + + // Ended up with error, but now you should also expect error on the caller side. + + // Wait 5s until you get a connection broken. + nready = srt_epoll_uwait(eidc, ready, 2, 5000); + EXPECT_EQ(nready, 1); + if (nready == 1) + { + // Do extra checks only if you know that this was returned. + EXPECT_EQ(ready[0].fd, caller_sock); + EXPECT_EQ(ready[0].events & SRT_EPOLL_ERR, 0); + } + srt_close(caller_sock); + srt_close(listener_sock); + + srt_cleanup(); +} + + From b7c8050aa11b4afb41b92521d25ffa75133cceeb Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Wed, 26 Jun 2024 13:10:13 +0200 Subject: [PATCH 3/8] [core] Fixed missing traceBelatedTime initialization. --- srtcore/core.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 1612830e7..faf9c26ab 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -929,6 +929,7 @@ void srt::CUDT::clearData() m_stats.tsLastSampleTime = steady_clock::now(); m_stats.traceReorderDistance = 0; + m_stats.traceBelatedTime = 0; m_stats.sndDuration = m_stats.m_sndDurationTotal = 0; } From 36260c395b6f65182e67ba19d776d875836c6ccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 26 Jun 2024 17:38:29 +0200 Subject: [PATCH 4/8] [core] Removed settable ability for SRTO_VERSION --- srtcore/socketconfig.cpp | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/srtcore/socketconfig.cpp b/srtcore/socketconfig.cpp index d44330f78..1c067d059 100644 --- a/srtcore/socketconfig.cpp +++ b/srtcore/socketconfig.cpp @@ -559,15 +559,6 @@ struct CSrtConfigSetter } }; -template<> -struct CSrtConfigSetter -{ - static void set(CSrtConfig& co, const void* optval, int optlen) - { - co.uSrtVersion = cast_optval(optval, optlen); - } -}; - template<> struct CSrtConfigSetter { @@ -971,7 +962,6 @@ int dispatchSet(SRT_SOCKOPT optName, CSrtConfig& co, const void* optval, int opt DISPATCH(SRTO_CONNTIMEO); DISPATCH(SRTO_DRIFTTRACER); DISPATCH(SRTO_LOSSMAXTTL); - DISPATCH(SRTO_VERSION); DISPATCH(SRTO_MINVERSION); DISPATCH(SRTO_STREAMID); DISPATCH(SRTO_CONGESTION); From 5e6e80b525f6bc2b96f24fbef2143a5a51e59a7c Mon Sep 17 00:00:00 2001 From: Mikolaj Malecki Date: Wed, 26 Jun 2024 15:15:23 +0200 Subject: [PATCH 5/8] [core] Fixed broadcast group: cut failed links on partial sending success --- srtcore/group.cpp | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 3e57ef5df..d2d275bf6 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -1501,7 +1501,10 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) int ercode = 0; - if (was_blocked) + // This block causes waiting for any socket to accept the payload. + // This should be done only in blocking mode and only if no other socket + // accepted the payload. + if (was_blocked && none_succeeded && m_bSynSending) { m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false); if (!m_bSynSending) @@ -1648,6 +1651,19 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc) throw CUDTException(major, minor, 0); } + for (vector::iterator is = sendstates.begin(); is != sendstates.end(); ++is) + { + // Here we have a situation that at least 1 link successfully sent a packet. + // All links for which sending has failed must be closed. + if (is->stat == -1) + { + // This only sets the state to the socket; the GC process should + // pick it up at the next time. + HLOGC(gslog.Debug, log << "grp/sendBroadcast: per PARTIAL SUCCESS, closing failed @" << is->id); + is->mb->ps->setBrokenClosed(); + } + } + // Now that at least one link has succeeded, update sending stats. m_stats.sent.count(len); From 0680092fe6aef1c912f8403ffaf4a2baf8e8db34 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Wed, 3 Jul 2024 14:39:42 +0200 Subject: [PATCH 6/8] [core] Removed a wrong assertion about ACK timestamp. --- srtcore/core.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index faf9c26ab..b7f2723c2 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -8044,7 +8044,6 @@ bool srt::CUDT::getFirstNoncontSequence(int32_t& w_seq, string& w_log_reason) int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) { - SRT_ASSERT(ctrlpkt.getMsgTimeStamp() != 0); int nbsent = 0; int local_prevack = 0; #if ENABLE_HEAVY_LOGGING From 54c002f6ad9e3643d5d8117d32c5dada9944c151 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Tue, 9 Jul 2024 12:27:43 +0200 Subject: [PATCH 7/8] [core] Fix TSBPD thread create/join protection. Co-authored-by: Sektor van Skijlen --- srtcore/core.cpp | 9 +++++---- srtcore/core.h | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index b7f2723c2..83ff52f16 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -9992,12 +9992,13 @@ bool srt::CUDT::overrideSndSeqNo(int32_t seq) int srt::CUDT::checkLazySpawnTsbPdThread() { const bool need_tsbpd = m_bTsbPd || m_bGroupTsbPd; + if (!need_tsbpd) + return 0; - if (need_tsbpd && !m_RcvTsbPdThread.joinable()) + ScopedLock lock(m_RcvTsbPdStartupLock); + if (!m_RcvTsbPdThread.joinable()) { - ScopedLock lock(m_RcvTsbPdStartupLock); - - if (m_bClosing) // Check again to protect join() in CUDT::releaseSync() + if (m_bClosing) // Check m_bClosing to protect join() in CUDT::releaseSync(). return -1; HLOGP(qrlog.Debug, "Spawning Socket TSBPD thread"); diff --git a/srtcore/core.h b/srtcore/core.h index e24bd8152..3935b99d3 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -995,7 +995,7 @@ class CUDT sync::CThread m_RcvTsbPdThread; // Rcv TsbPD Thread handle sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock bool m_bTsbPdNeedsWakeup; // Signal TsbPd thread to wake up on RCV buffer state change. - sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creating and joining + sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creation and joining. CallbackHolder m_cbAcceptHook; CallbackHolder m_cbConnectHook; From 84d18ec9d1ef9e1bb37133d07151d59e92b4295e Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Tue, 9 Jul 2024 17:09:34 +0200 Subject: [PATCH 8/8] [core] Added missing SRT_ATTR_GUARDED_BY(m_GlobControlLock). Removed unused m_MultiplexerLock. --- srtcore/api.cpp | 5 +---- srtcore/api.h | 20 +++++++++++++++----- srtcore/cache.h | 10 ++++++---- srtcore/core.cpp | 2 +- srtcore/group.cpp | 2 +- 5 files changed, 24 insertions(+), 15 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index ca26600d1..665593c39 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -174,8 +174,7 @@ srt::CUDTUnited::CUDTUnited() , m_GlobControlLock() , m_IDLock() , m_mMultiplexer() - , m_MultiplexerLock() - , m_pCache(NULL) + , m_pCache(new CCache) , m_bClosing(false) , m_GCStopCond() , m_InitLock() @@ -195,8 +194,6 @@ srt::CUDTUnited::CUDTUnited() setupMutex(m_GlobControlLock, "GlobControl"); setupMutex(m_IDLock, "ID"); setupMutex(m_InitLock, "Init"); - - m_pCache = new CCache; } srt::CUDTUnited::~CUDTUnited() diff --git a/srtcore/api.h b/srtcore/api.h index fddbfc294..6dbad6634 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -385,11 +385,13 @@ class CUDTUnited private: typedef std::map sockets_t; // stores all the socket structures - sockets_t m_Sockets; + SRT_ATTR_GUARDED_BY(m_GlobControlLock) + sockets_t m_Sockets; #if ENABLE_BONDING typedef std::map groups_t; - groups_t m_Groups; + SRT_ATTR_GUARDED_BY(m_GlobControlLock) + groups_t m_Groups; #endif sync::Mutex m_GlobControlLock; // used to synchronize UDT API @@ -399,6 +401,7 @@ class CUDTUnited SRTSOCKET m_SocketIDGenerator; // seed to generate a new unique socket ID SRTSOCKET m_SocketIDGenerator_init; // Keeps track of the very first one + SRT_ATTR_GUARDED_BY(m_GlobControlLock) std::map > m_PeerRec; // record sockets from peers to avoid repeated connection request, int64_t = (socker_id << 30) + isn @@ -460,11 +463,13 @@ class CUDTUnited const sockaddr_any& reqaddr, const CSrtMuxerConfig& cfgSocket); private: + SRT_ATTR_GUARDED_BY(m_GlobControlLock) std::map m_mMultiplexer; // UDP multiplexer - sync::Mutex m_MultiplexerLock; -private: - CCache* m_pCache; // UDT network information cache + /// UDT network information cache. + /// Existence is guarded by m_GlobControlLock, but the cache itself is thread-safe. + SRT_ATTR_GUARDED_BY(m_GlobControlLock) + CCache* const m_pCache; private: srt::sync::atomic m_bClosing; @@ -472,14 +477,19 @@ class CUDTUnited sync::Condition m_GCStopCond; sync::Mutex m_InitLock; + SRT_ATTR_GUARDED_BY(m_InitLock) int m_iInstanceCount; // number of startup() called by application + SRT_ATTR_GUARDED_BY(m_InitLock) bool m_bGCStatus; // if the GC thread is working (true) + SRT_ATTR_GUARDED_BY(m_InitLock) sync::CThread m_GCThread; static void* garbageCollect(void*); + SRT_ATTR_GUARDED_BY(m_GlobControlLock) sockets_t m_ClosedSockets; // temporarily store closed sockets #if ENABLE_BONDING + SRT_ATTR_GUARDED_BY(m_GlobControlLock) groups_t m_ClosedGroups; #endif diff --git a/srtcore/cache.h b/srtcore/cache.h index 47633706a..d5a037633 100644 --- a/srtcore/cache.h +++ b/srtcore/cache.h @@ -192,9 +192,11 @@ template class CCache return 0; } - /// Specify the cache size (i.e., max number of items). - /// @param [in] size max cache size. +private: + /// Specify the cache size (i.e., max number of items). + /// Private or else must be protected by a lock. + /// @param [in] size max cache size. void setSizeLimit(int size) { m_iMaxSize = size; @@ -202,8 +204,8 @@ template class CCache m_vHashPtr.resize(m_iHashSize); } - /// Clear all entries in the cache, restore to initialization state. - + /// Clear all entries in the cache, restore to initialization state. + /// Private or else must be protected by a lock. void clear() { for (typename std::list::iterator i = m_StorageList.begin(); i != m_StorageList.end(); ++ i) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 83ff52f16..b802bdec9 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -11946,7 +11946,7 @@ void srt::CUDT::processKeepalive(const CPacket& ctrlpkt, const time_point& tsArr if (m_parent->m_GroupOf) { // Lock GlobControlLock in order to make sure that - // the state if the socket having the group and the + // the state of the socket having the group and the // existence of the group will not be changed during // the operation. The attempt of group deletion will // have to wait until this operation completes. diff --git a/srtcore/group.cpp b/srtcore/group.cpp index d2d275bf6..d4598d7c1 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -3199,7 +3199,7 @@ void CUDTGroup::send_CloseBrokenSockets(vector& w_wipeme) InvertedLock ug(m_GroupLock); // With unlocked GroupLock, we can now lock GlobControlLock. - // This is needed prevent any of them be deleted from the container + // This is needed to prevent any of them deleted from the container // at the same time. ScopedLock globlock(CUDT::uglobal().m_GlobControlLock);