From e86b2203b8b036f4abf40b9683dea04ee95a9a7d Mon Sep 17 00:00:00 2001 From: chengang06 Date: Fri, 5 Mar 2021 11:11:10 +0800 Subject: [PATCH 01/11] fix epoll event loss problem --- srtcore/core.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 2d5cb2d86..02f796fb1 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -7553,7 +7553,19 @@ int CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) CSync::lock_signal(m_RecvDataCond, m_RecvLock); } // acknowledge any waiting epolls to read + // fix SRT_EPOLL_IN event loss but rcvbuffer still have data: + // 1. user call receive/receivemessage(about line number:6482) + // 2. after read/receive, if rcvbuffer is empty, will set SRT_EPOLL_IN event to false + // 3. but if we do not do some lock work here, will cause some sync problems between threads: + // (1) user thread: call receive/receivemessage + // (2) user thread: read data + // (3) user thread: no data in rcvbuffer, set SRT_EPOLL_IN event to false + // (4) receive thread: receive data and set SRT_EPOLL_IN to true + // (5) user thread: set SRT_EPOLL_IN to false + // 4. so , m_RecvLock must be used here to protect epoll event + enterCS(m_RecvLock); s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true); + leaveCS(m_RecvLock); #if ENABLE_EXPERIMENTAL_BONDING if (m_parent->m_GroupOf) { From a3d0b6ed8193ef9143f7bf76455dca620f235a22 Mon Sep 17 00:00:00 2001 From: chengang06 Date: Tue, 9 Mar 2021 10:35:32 +0800 Subject: [PATCH 02/11] fix epoll event loss problem, use elegant lock method --- srtcore/core.cpp | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 87e34cbfd..c2d4caedf 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -7554,25 +7554,27 @@ int CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) } else { - if (m_config.bSynRecving) - { - // signal a waiting "recv" call if there is any data available - CSync::lock_signal(m_RecvDataCond, m_RecvLock); - } - // acknowledge any waiting epolls to read - // fix SRT_EPOLL_IN event loss but rcvbuffer still have data: - // 1. user call receive/receivemessage(about line number:6482) - // 2. after read/receive, if rcvbuffer is empty, will set SRT_EPOLL_IN event to false - // 3. but if we do not do some lock work here, will cause some sync problems between threads: - // (1) user thread: call receive/receivemessage - // (2) user thread: read data - // (3) user thread: no data in rcvbuffer, set SRT_EPOLL_IN event to false - // (4) receive thread: receive data and set SRT_EPOLL_IN to true - // (5) user thread: set SRT_EPOLL_IN to false - // 4. so , m_RecvLock must be used here to protect epoll event - enterCS(m_RecvLock); - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true); - leaveCS(m_RecvLock); + { + UniqueLock rdlock (m_RecvLock); + CSync rdcond (m_RecvDataCond, rdlock); + if (m_config.bSynRecving) + { + // signal a waiting "recv" call if there is any data available + rdcond.signal_locked(rdlock); + } + // acknowledge any waiting epolls to read + // fix SRT_EPOLL_IN event loss but rcvbuffer still have data: + // 1. user call receive/receivemessage(about line number:6482) + // 2. after read/receive, if rcvbuffer is empty, will set SRT_EPOLL_IN event to false + // 3. but if we do not do some lock work here, will cause some sync problems between threads: + // (1) user thread: call receive/receivemessage + // (2) user thread: read data + // (3) user thread: no data in rcvbuffer, set SRT_EPOLL_IN event to false + // (4) receive thread: receive data and set SRT_EPOLL_IN to true + // (5) user thread: set SRT_EPOLL_IN to false + // 4. so , m_RecvLock must be used here to protect epoll event + s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true); + } #if ENABLE_EXPERIMENTAL_BONDING if (m_parent->m_GroupOf) { From 05cb7560db9a9d92e359c557e1023a0f9ff8a307 Mon Sep 17 00:00:00 2001 From: chengang06 Date: Wed, 12 May 2021 14:40:34 +0800 Subject: [PATCH 03/11] fix band width limitation problem --- srtcore/core.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index efc866164..f789c921d 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -10850,7 +10850,7 @@ void CUDT::checkRexmitTimer(const steady_clock::time_point& currtime) */ const uint64_t rtt_syn = (m_iSRTT + 4 * m_iRTTVar + 2 * COMM_SYN_INTERVAL_US); - const uint64_t exp_int_us = (m_iReXmitCount * rtt_syn + COMM_SYN_INTERVAL_US); + const uint64_t exp_int_us = (m_iReXmitCount * rtt_syn + COMM_SYN_INTERVAL_US + count_microseconds(m_tdSendInterval)); if (currtime <= (m_tsLastRspAckTime + microseconds_from(exp_int_us))) return; From 1b5da233fafe0e775fda2fd9f6a49d6fc3657d79 Mon Sep 17 00:00:00 2001 From: chengang06 Date: Wed, 20 Oct 2021 14:48:37 +0800 Subject: [PATCH 04/11] fix crash while cleanup --- srtcore/api.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 59399a4b3..15c83fa57 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -3172,7 +3172,7 @@ void* srt::CUDTUnited::garbageCollect(void* p) for (sockets_t::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++ j) { - j->second->m_tsClosureTimeStamp = steady_clock::time_point(); + j->second->m_tsClosureTimeStamp = steady_clock::now(); } } From 6f9e130181138558c540902158defdacec8f5ef4 Mon Sep 17 00:00:00 2001 From: chengang06 Date: Wed, 20 Oct 2021 15:16:04 +0800 Subject: [PATCH 05/11] fix crash while cleanup --- srtcore/core.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index cbdb525c9..787533cea 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -10960,7 +10960,7 @@ void srt::CUDT::checkRexmitTimer(const steady_clock::time_point& currtime) // const uint64_t rtt_syn = (m_iSRTT + 4 * m_iRTTVar + 2 * COMM_SYN_INTERVAL_US); - const uint64_t exp_int_us = (m_iReXmitCount * rtt_syn + COMM_SYN_INTERVAL_US + count_microseconds(m_tdSendInterval)); + const uint64_t exp_int_us = (m_iReXmitCount * rtt_syn + COMM_SYN_INTERVAL_US); if (currtime <= (m_tsLastRspAckTime + microseconds_from(exp_int_us))) return; From a4cae8cee9854e7f91b6cfddb01b8d5b185257b3 Mon Sep 17 00:00:00 2001 From: chengang06 Date: Mon, 25 Oct 2021 14:54:39 +0800 Subject: [PATCH 06/11] fix cookie syn issues --- srtcore/core.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 787533cea..fae85c682 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -10535,7 +10535,7 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet) log << "processConnectRequest: received type=" << RequestTypeStr(hs.m_iReqType) << " - checking cookie..."); if (hs.m_iCookie != cookie_val) { - cookie_val = bake(addr, cookie_val, -1); // SHOULD generate an earlier, distracted cookie + cookie_val = bake(addr, cookie_val, 1); // SHOULD generate an earlier, distracted cookie if (hs.m_iCookie != cookie_val) { From 66b9dc8d3f4ff8b0565f5f2668ae03bec371db32 Mon Sep 17 00:00:00 2001 From: chengang06 Date: Mon, 25 Oct 2021 19:22:20 +0800 Subject: [PATCH 07/11] fix cookie syn issues --- srtcore/api.cpp | 2 +- srtcore/core.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 97abbf899..8e18bd23e 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -3178,7 +3178,7 @@ void* srt::CUDTUnited::garbageCollect(void* p) for (sockets_t::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++ j) { - j->second->m_tsClosureTimeStamp = steady_clock::now(); + j->second->m_tsClosureTimeStamp = steady_clock::time_point(); } } diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 46d428dab..d0849c6b8 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -10610,7 +10610,7 @@ int32_t srt::CUDT::bake(const sockaddr_any& addr, int32_t current_cookie, int co clientport, sizeof(clientport), NI_NUMERICHOST | NI_NUMERICSERV); - int64_t timestamp = (count_microseconds(steady_clock::now() - m_stats.tsStartTime) / 60000000) + distractor - + int64_t timestamp = (count_microseconds(steady_clock::now() - m_stats.tsStartTime) / 60000000) + distractor + correction; // secret changes every one minute stringstream cookiestr; cookiestr << clienthost << ":" << clientport << ":" << timestamp; @@ -10785,7 +10785,7 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet) log << "processConnectRequest: received type=" << RequestTypeStr(hs.m_iReqType) << " - checking cookie..."); if (hs.m_iCookie != cookie_val) { - cookie_val = bake(addr, cookie_val, 1); // SHOULD generate an earlier, distracted cookie + cookie_val = bake(addr, cookie_val, -1); // SHOULD generate an earlier, distracted cookie if (hs.m_iCookie != cookie_val) { From afe9ec8534eecb00d8ae89b6acd6f1015f4027e6 Mon Sep 17 00:00:00 2001 From: chengang06 Date: Tue, 30 Nov 2021 18:25:26 +0800 Subject: [PATCH 08/11] fix update connection status crash --- srtcore/api.cpp | 4 ++++ srtcore/api.h | 2 ++ srtcore/queue.cpp | 2 ++ 3 files changed, 8 insertions(+) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 8e18bd23e..031de483e 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -178,6 +178,7 @@ bool srt::CUDTSocket::broken() const srt::CUDTUnited::CUDTUnited(): m_Sockets(), m_GlobControlLock(), + m_UpdateConnStatusLock(), m_IDLock(), m_mMultiplexer(), m_MultiplexerLock(), @@ -197,6 +198,7 @@ srt::CUDTUnited::CUDTUnited(): // might destroy the application before `main`. This shouldn't // be a problem in general. setupMutex(m_GlobControlLock, "GlobControl"); + setupMutex(m_UpdateConnStatusLock, "UpdateConnectionStatus"); setupMutex(m_IDLock, "ID"); setupMutex(m_InitLock, "Init"); @@ -214,6 +216,7 @@ srt::CUDTUnited::~CUDTUnited() } releaseMutex(m_GlobControlLock); + releaseMutex(m_UpdateConnStatusLock); releaseMutex(m_IDLock); releaseMutex(m_InitLock); @@ -2572,6 +2575,7 @@ srt::CUDTSocket* srt::CUDTUnited::locatePeer( void srt::CUDTUnited::checkBrokenSockets() { + ScopedLock us(m_UpdateConnStatusLock); ScopedLock cg(m_GlobControlLock); #if ENABLE_EXPERIMENTAL_BONDING diff --git a/srtcore/api.h b/srtcore/api.h index 516322292..17259363b 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -381,6 +381,8 @@ friend class CRendezvousQueue; sync::Mutex m_GlobControlLock; // used to synchronize UDT API + sync::Mutex m_UpdateConnStatusLock; // used to synchronize Garbage Collector and UpdateConnectionStatus + sync::Mutex m_IDLock; // used to synchronize ID generation SRTSOCKET m_SocketIDGenerator; // seed to generate a new unique socket ID diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 62d158af7..23070ac3d 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -930,6 +930,8 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst // Need a stub value for a case when there's no unit provided ("storage depleted" case). // It should be normally NOT IN USE because in case of "storage depleted", rst != RST_OK. const SRTSOCKET dest_id = pkt ? pkt->m_iID : 0; + + ScopedLock us(CUDT::uglobal().m_UpdateConnStatusLock); // If no socket were qualified for further handling, finish here. // Otherwise toRemove and toProcess contain items to handle. From bdedc07061666887ca7ba7bd9b4e24f7d4d1588c Mon Sep 17 00:00:00 2001 From: chengang06 Date: Wed, 1 Dec 2021 14:27:24 +0800 Subject: [PATCH 09/11] fix update connection status crash --- srtcore/api.cpp | 61 ++++++++++++++++++++++++++++++------------------- srtcore/api.h | 8 +++---- 2 files changed, 41 insertions(+), 28 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 031de483e..166495c71 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -2573,7 +2573,7 @@ srt::CUDTSocket* srt::CUDTUnited::locatePeer( return NULL; } -void srt::CUDTUnited::checkBrokenSockets() +void srt::CUDTUnited::checkBrokenSockets(PlexerList& toDestroy) { ScopedLock us(m_UpdateConnStatusLock); ScopedLock cg(m_GlobControlLock); @@ -2714,11 +2714,11 @@ void srt::CUDTUnited::checkBrokenSockets() // remove those timeout sockets for (vector::iterator l = tbr.begin(); l != tbr.end(); ++ l) - removeSocket(*l); + removeSocket(*l, toDestroy); } // [[using locked(m_GlobControlLock)]] -void srt::CUDTUnited::removeSocket(const SRTSOCKET u) +void srt::CUDTUnited::removeSocket(const SRTSOCKET u, PlexerList& toDestroy) { sockets_t::iterator i = m_ClosedSockets.find(u); @@ -2813,26 +2813,37 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u) LOGC(smlog.Fatal, log << "IPE: For socket @" << u << " MUXER id=" << mid << " NOT FOUND!"); return; } + toDestroy.push_back(m); + - CMultiplexer& mx = m->second; + +} - mx.m_iRefCount --; - // HLOGF(smlog.Debug, "unrefing underlying socket for %u: %u\n", - // u, mx.m_iRefCount); - if (0 == mx.m_iRefCount) +void srt::CUDTUnited::tryDestroyMuxer(PlexerList& toDestroy) +{ + for (auto it = toDestroy.begin(); it != toDestroy.end(); ++it) { - HLOGC(smlog.Debug, log << "MUXER id=" << mid << " lost last socket @" - << u << " - deleting muxer bound to port " - << mx.m_pChannel->bindAddressAny().hport()); - // The channel has no access to the queues and - // it looks like the multiplexer is the master of all of them. - // The queues must be silenced before closing the channel - // because this will cause error to be returned in any operation - // being currently done in the queues, if any. - mx.m_pSndQueue->setClosing(); - mx.m_pRcvQueue->setClosing(); - mx.destroy(); - m_mMultiplexer.erase(m); + auto m = *it; + CMultiplexer& mx = m->second; + mx.m_iRefCount--; + // HLOGF(smlog.Debug, "unrefing underlying socket for %u: %u\n", + // u, mx.m_iRefCount); + + if (0 == mx.m_iRefCount) + { + HLOGC(smlog.Debug, + log << "MUXER id=" << mx.m_iID << " lost last socket - deleting muxer bound to port " + << mx.m_pChannel->bindAddressAny().hport()); + // The channel has no access to the queues and + // it looks like the multiplexer is the master of all of them. + // The queues must be silenced before closing the channel + // because this will cause error to be returned in any operation + // being currently done in the queues, if any. + mx.m_pSndQueue->setClosing(); + mx.m_pRcvQueue->setClosing(); + mx.destroy(); + m_mMultiplexer.erase(m); + } } } @@ -3136,8 +3147,9 @@ void* srt::CUDTUnited::garbageCollect(void* p) while (!self->m_bClosing) { INCREMENT_THREAD_ITERATIONS(); - self->checkBrokenSockets(); - + PlexerList PlexerstoDestroy; + self->checkBrokenSockets(PlexerstoDestroy); + self->tryDestroyMuxer(PlexerstoDestroy); HLOGC(inlog.Debug, log << "GC: sleep 1 s"); self->m_GCStopCond.wait_for(gclock, seconds_from(1)); } @@ -3189,8 +3201,9 @@ void* srt::CUDTUnited::garbageCollect(void* p) HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all CLOSED sockets."); while (true) { - self->checkBrokenSockets(); - + PlexerList PlexerstoDestroy; + self->checkBrokenSockets(PlexerstoDestroy); + self->tryDestroyMuxer(PlexerstoDestroy); enterCS(self->m_GlobControlLock); bool empty = self->m_ClosedSockets.empty(); leaveCS(self->m_GlobControlLock); diff --git a/srtcore/api.h b/srtcore/api.h index 17259363b..1cf85bb66 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -474,10 +474,10 @@ friend class CRendezvousQueue; #if ENABLE_EXPERIMENTAL_BONDING groups_t m_ClosedGroups; #endif - - void checkBrokenSockets(); - void removeSocket(const SRTSOCKET u); - + typedef std::list::iterator> PlexerList; + void checkBrokenSockets(PlexerList& toDestroy); + void removeSocket(const SRTSOCKET u, PlexerList& toDestroy); + void tryDestroyMuxer(PlexerList& toDestroy); CEPoll m_EPoll; // handling epoll data structures and events private: From 82b178af2c1e59e525fa766ec93bf16ecb63c6e1 Mon Sep 17 00:00:00 2001 From: chengang06 Date: Wed, 1 Dec 2021 16:59:23 +0800 Subject: [PATCH 10/11] fix update connection status crash --- srtcore/api.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 166495c71..878108463 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -2821,6 +2821,7 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u, PlexerList& toDestroy) void srt::CUDTUnited::tryDestroyMuxer(PlexerList& toDestroy) { + ScopedLock cg(m_GlobControlLock); for (auto it = toDestroy.begin(); it != toDestroy.end(); ++it) { auto m = *it; From 1bb92a059b219943023bb7d91c1c3077f2562bf6 Mon Sep 17 00:00:00 2001 From: chengang06 Date: Wed, 1 Dec 2021 17:11:52 +0800 Subject: [PATCH 11/11] fix update connection status crash --- srtcore/api.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 878108463..93a10f834 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -2822,9 +2822,9 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u, PlexerList& toDestroy) void srt::CUDTUnited::tryDestroyMuxer(PlexerList& toDestroy) { ScopedLock cg(m_GlobControlLock); - for (auto it = toDestroy.begin(); it != toDestroy.end(); ++it) + for (PlexerList::iterator it = toDestroy.begin(); it != toDestroy.end(); ++it) { - auto m = *it; + std::map::iterator m = *it; CMultiplexer& mx = m->second; mx.m_iRefCount--; // HLOGF(smlog.Debug, "unrefing underlying socket for %u: %u\n",