From 7859daa7c917bba19d45bff804ca2f56759a4412 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Wed, 20 Apr 2022 16:43:04 +0200 Subject: [PATCH 1/5] [core] Introduced the CSharedResource and related scoped locks. srt::sync::CThread::id: added the default constructor, dropped constantness. --- srtcore/sync.cpp | 50 ++++++++++++++++++++++++ srtcore/sync.h | 99 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 148 insertions(+), 1 deletion(-) diff --git a/srtcore/sync.cpp b/srtcore/sync.cpp index fdf5a0b66..91815f238 100644 --- a/srtcore/sync.cpp +++ b/srtcore/sync.cpp @@ -353,3 +353,53 @@ int srt::sync::genRandomInt(int minVal, int maxVal) #endif // HAVE_CXX11 } +//////////////////////////////////////////////////////////////////////////////// +// +// CSharedResource class +// +//////////////////////////////////////////////////////////////////////////////// + +void srt::sync::CSharedResource::release() +{ + ScopedLock lock(m_lock); + SRT_ASSERT(m_iResourceTaken > 0); + --m_iResourceTaken; + if (m_iResourceTaken == 0) + m_cond.notify_one(); +} + +void srt::sync::CSharedResource::acquire() +{ + UniqueLock lock(m_lock); + + // Allow reacquiring the resource from the same thread + if (m_iResourceTaken > 0 && this_thread::get_id() == m_thid) + { + ++m_iResourceTaken; + return; + } + + while (m_iResourceTaken > 0) + { + m_cond.wait(lock); + } + + SRT_ASSERT(m_iResourceTaken == 0); + ++m_iResourceTaken; + m_thid = this_thread::get_id(); +} + +bool srt::sync::CSharedResource::tryAcquire() +{ + ScopedLock lock(m_lock); + + // Immediately withdraw from trying to acquire the taken resource from a different thread. + if (m_iResourceTaken > 0 && this_thread::get_id() != m_thid) + { + return false; + } + + ++m_iResourceTaken; + m_thid = this_thread::get_id(); + return true; +} diff --git a/srtcore/sync.h b/srtcore/sync.h index b68061cf6..aae17a648 100644 --- a/srtcore/sync.h +++ b/srtcore/sync.h @@ -770,11 +770,17 @@ class CThread struct id { + id() + : value() + { + } + explicit id(const pthread_t t) : value(t) {} - const pthread_t value; + pthread_t value; + inline bool operator==(const id& second) const { return pthread_equal(value, second.value) != 0; @@ -871,6 +877,97 @@ CUDTException& GetThreadLocalError(); /// @param[in] minVal minimum allowed value of the resulting random number. /// @param[in] maxVal maximum allowed value of the resulting random number. int genRandomInt(int minVal, int maxVal); + +//////////////////////////////////////////////////////////////////////////////// +// +// CSharedResource class +// +//////////////////////////////////////////////////////////////////////////////// + +/// The class to synchronize acces to a shared resource. +/// Can be acquired multiple times by the same thread. +class CSharedResource +{ +public: + CSharedResource() + : m_iResourceTaken(0) + { + } + + /// Releases the shared resource. Can taken several times by the same thread, + /// therefore notifies another thread once finally release by this thread. + void release(); + + /// Acquires the shared resource. Can be acquired multiple times from the same thread, + /// would just increase the reference count. It the resource is taken, waits on a CV for a notification. + void acquire(); + + /// Tries to acquire the shared resource. + /// @return true if the resource has been acquired, false otherwise. + bool tryAcquire(); + + /// @brief To follow POSIX Condition::init(). + void init() + { + m_cond.init(); + } + + /// @brief To follow POSIX Condition::destroy(). + void destroy() + { + m_cond.destroy(); + } + +private: + Mutex m_lock; + Condition m_cond; + int m_iResourceTaken; + CThread::id m_thid; +}; + +/// Acquires the shared resource on creation, releases upon destruction. +class CScopedResourceLock +{ +public: + CScopedResourceLock(CSharedResource& resource) + : m_rsrc(resource) + { + m_rsrc.acquire(); + } + + ~CScopedResourceLock() + { + m_rsrc.release(); + } + +private: + CSharedResource& m_rsrc; +}; + +/// Tries to acquire the shared resource on creation, releases if was aquired upon destruction. +class CScopedResourceTryLock +{ +public: + CScopedResourceTryLock(CSharedResource& resource) + : m_rsrc(resource) + { + m_isAcquired = m_rsrc.tryAcquire(); + } + + ~CScopedResourceTryLock() + { + if (m_isAcquired) + m_rsrc.release(); + } + + explicit operator bool() const { return m_isAcquired; } + + bool operator!() const { return !m_isAcquired; } + +private: + CSharedResource& m_rsrc; + bool m_isAcquired; +}; } // namespace sync } // namespace srt From 056ccbff11d8fbeb3dc4790083e88a1e125b9d5f Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Wed, 20 Apr 2022 16:43:56 +0200 Subject: [PATCH 2/5] [core] Converted CUDT::m_ConnectionLock to CSharedResource. --- srtcore/core.cpp | 37 ++++++++++++++++++------------------ srtcore/core.h | 22 ++++++++++----------- srtcore/group.cpp | 2 +- srtcore/queue.cpp | 2 +- test/test_socket_options.cpp | 2 +- 5 files changed, 33 insertions(+), 32 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index b402081d6..c3020409a 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -387,7 +387,7 @@ void srt::CUDT::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen) // Restriction check const int oflags = s_sockopt_action.flags[optName]; - ScopedLock cg (m_ConnectionLock); + CScopedResourceLock cg (m_ConnectionResources); ScopedLock sendguard (m_SendLock); ScopedLock recvguard (m_RecvLock); @@ -436,7 +436,7 @@ void srt::CUDT::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen) void srt::CUDT::getOpt(SRT_SOCKOPT optName, void *optval, int &optlen) { - ScopedLock cg(m_ConnectionLock); + CScopedResourceLock cg(m_ConnectionResources); switch (optName) { @@ -912,7 +912,7 @@ void srt::CUDT::clearData() void srt::CUDT::open() { - ScopedLock cg(m_ConnectionLock); + CScopedResourceLock cg(m_ConnectionResources); clearData(); @@ -965,7 +965,7 @@ void srt::CUDT::open() void srt::CUDT::setListenState() { - ScopedLock cg(m_ConnectionLock); + CScopedResourceLock cg(m_ConnectionResources); if (!m_bOpened) throw CUDTException(MJ_NOTSUP, MN_NONE, 0); @@ -3346,7 +3346,7 @@ void srt::CUDT::synchronizeWithGroup(CUDTGroup* gp) void srt::CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn) { - ScopedLock cg (m_ConnectionLock); + CScopedResourceLock cg (m_ConnectionResources); HLOGC(aclog.Debug, log << CONID() << "startConnect: -> " << serv_addr.str() << (m_config.bSynRecving ? " (SYNCHRONOUS)" : " (ASYNCHRONOUS)") << "..."); @@ -3481,7 +3481,7 @@ void srt::CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn) /* * Race condition if non-block connect response thread scheduled before we set m_bConnecting to true? * Connect response will be ignored and connecting will wait until timeout. - * Maybe m_ConnectionLock handling problem? Not used in CUDT::connect(const CPacket& response) + * Maybe m_ConnectionResources handling problem? Not used in CUDT::connect(const CPacket& response) */ m_tsLastReqTime = now; m_bConnecting = true; @@ -3728,7 +3728,7 @@ EConnectStatus srt::CUDT::processAsyncConnectResponse(const CPacket &pkt) ATR_NO EConnectStatus cst = CONN_CONTINUE; CUDTException e; - ScopedLock cg(m_ConnectionLock); + CScopedResourceLock cg(m_ConnectionResources); HLOGC(cnlog.Debug, log << CONID() << "processAsyncConnectResponse: got response for connect request, processing"); cst = processConnectResponse(pkt, &e); @@ -3766,7 +3766,7 @@ bool srt::CUDT::processAsyncConnectRequest(EReadStatus rst, bool status = true; - ScopedLock cg(m_ConnectionLock); + CScopedResourceLock cg(m_ConnectionResources); if (!m_bOpened) // Check the socket has not been closed before already. return false; @@ -4242,10 +4242,10 @@ EConnectStatus srt::CUDT::processRendezvous( return CONN_CONTINUE; } -// [[using locked(m_ConnectionLock)]]; +// [[using locked(m_ConnectionResources)]]; EConnectStatus srt::CUDT::processConnectResponse(const CPacket& response, CUDTException* eout) ATR_NOEXCEPT { - // NOTE: ASSUMED LOCK ON: m_ConnectionLock. + // NOTE: ASSUMED LOCK ON: m_ConnectionResources. // this is the 2nd half of a connection request. If the connection is setup successfully this returns 0. // Returned values: @@ -4615,7 +4615,7 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous, // And, I am connected too. m_bConnecting = false; - // The lock on m_ConnectionLock should still be applied, but + // The lock on m_ConnectionResources should still be applied, but // the socket could have been started removal before this function // has started. Do a sanity check before you continue with the // connection process. @@ -5654,7 +5654,7 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& { HLOGC(cnlog.Debug, log << "acceptAndRespond: setting up data according to handshake"); - ScopedLock cg(m_ConnectionLock); + CScopedResourceLock cg(m_ConnectionResources); m_tsRcvPeerStartTime = steady_clock::time_point(); // will be set correctly at SRT HS @@ -6145,7 +6145,7 @@ bool srt::CUDT::closeInternal() HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE. Acquiring connection lock"); - ScopedLock connectguard(m_ConnectionLock); + CScopedResourceLock connectguard(m_ConnectionResources); // Signal the sender and recver if they are waiting for data. releaseSynch(); @@ -7425,7 +7425,8 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous) perf->mbpsBandwidth = Bps2Mbps(availbw * (m_iMaxSRTPayloadSize + pktHdrSize)); - if (tryEnterCS(m_ConnectionLock)) + //if (m_ConnectionResources.tryAcquire()) + if (CScopedResourceTryLock cg = m_ConnectionResources) { if (m_pSndBuffer) { @@ -7471,7 +7472,7 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous) perf->msRcvBuf = 0; } - leaveCS(m_ConnectionLock); + //m_ConnectionResources.release(); } else { @@ -7613,9 +7614,9 @@ void srt::CUDT::initSynch() setupMutex(m_RcvLossLock, "RcvLoss"); setupMutex(m_RecvAckLock, "RecvAck"); setupMutex(m_RcvBufferLock, "RcvBuffer"); - setupMutex(m_ConnectionLock, "Connection"); setupMutex(m_StatsLock, "Stats"); setupCond(m_RcvTsbPdCond, "RcvTsbPd"); + m_ConnectionResources.init(); } void srt::CUDT::destroySynch() @@ -7636,7 +7637,7 @@ void srt::CUDT::destroySynch() releaseMutex(m_RcvLossLock); releaseMutex(m_RecvAckLock); releaseMutex(m_RcvBufferLock); - releaseMutex(m_ConnectionLock); + m_ConnectionResources.destroy(); releaseMutex(m_StatsLock); m_RcvTsbPdCond.notify_all(); @@ -9347,7 +9348,7 @@ std::pair srt::CUDT::packData(CPacket& w_packet) string reason = "reXmit"; - ScopedLock connectguard(m_ConnectionLock); + CScopedResourceLock connectguard(m_ConnectionResources); // If a closing action is done simultaneously, then // m_bOpened should already be false, and it's set // just before releasing this lock. diff --git a/srtcore/core.h b/srtcore/core.h index 8fc1b8f62..fe70b38f2 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -306,7 +306,7 @@ class CUDT void addressAndSend(CPacket& pkt); - SRT_ATTR_REQUIRES(m_ConnectionLock) + SRT_ATTR_REQUIRES(m_ConnectionResources) void sendSrtMsg(int cmd, uint32_t *srtdata_in = NULL, size_t srtlen_in = 0); bool isOPT_TsbPd() const { return m_config.bTSBPD; } @@ -401,7 +401,7 @@ class CUDT // immediately to free the socket void notListening() { - sync::ScopedLock cg(m_ConnectionLock); + sync::CScopedResourceLock cg(m_ConnectionResources); m_bListening = false; m_pRcvQueue->removeListener(this); } @@ -466,7 +466,7 @@ class CUDT /// @retval 1 Connection in progress (m_ConnReq turned into RESPONSE) /// @retval -1 Connection failed - SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock) + SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources) EConnectStatus processConnectResponse(const CPacket& pkt, CUDTException* eout) ATR_NOEXCEPT; // This function works in case of HSv5 rendezvous. It changes the state @@ -486,14 +486,14 @@ class CUDT /// @param response incoming handshake response packet to be interpreted /// @param serv_addr incoming packet's address /// @param rst Current read status to know if the HS packet was freshly received from the peer, or this is only a periodic update (RST_AGAIN) - SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock) + SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources) EConnectStatus processRendezvous(const CPacket* response, const sockaddr_any& serv_addr, EReadStatus, CPacket& reqpkt); /// Create the CryptoControl object based on the HS packet. Allocates sender and receiver buffers and loss lists. - SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock) + SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources) bool prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd, CUDTException *eout); - SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock) + SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources) EConnectStatus postConnect(const CPacket* response, bool rendezvous, CUDTException* eout) ATR_NOEXCEPT; SRT_ATR_NODISCARD bool applyResponseSettings() ATR_NOEXCEPT; @@ -507,7 +507,7 @@ class CUDT SRT_ATR_NODISCARD size_t fillSrtHandshake_HSRSP(uint32_t* srtdata, size_t srtlen, int hs_version); SRT_ATR_NODISCARD size_t fillSrtHandshake(uint32_t* srtdata, size_t srtlen, int msgtype, int hs_version); - SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock) + SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources) bool createSrtHandshake(int srths_cmd, int srtkm_cmd, const uint32_t* data, size_t datalen, CPacket& w_reqpkt, CHandShake& w_hs); @@ -515,7 +515,7 @@ class CUDT #if ENABLE_EXPERIMENTAL_BONDING SRT_ATR_NODISCARD size_t fillHsExtGroup(uint32_t *pcmdspec); #endif - SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock) + SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources) size_t fillHsExtKMREQ(uint32_t *pcmdspec, size_t ki); SRT_ATR_NODISCARD size_t fillHsExtKMRSP(uint32_t *pcmdspec, const uint32_t *kmdata, size_t kmdata_wordsize); @@ -763,7 +763,7 @@ class CUDT int m_iTsbPdDelay_ms; // Rx delay to absorb burst, in milliseconds int m_iPeerTsbPdDelay_ms; // Tx delay that the peer uses to absorb burst, in milliseconds bool m_bTLPktDrop; // Enable Too-late Packet Drop - SRT_ATTR_PT_GUARDED_BY(m_ConnectionLock) + SRT_ATTR_PT_GUARDED_BY(m_ConnectionResources) UniquePtr m_pCryptoControl; // Crypto control module CCache* m_pCache; // Network information cache @@ -965,7 +965,7 @@ class CUDT private: // synchronization: mutexes and conditions - sync::Mutex m_ConnectionLock; // used to synchronize connection operation + sync::CSharedResource m_ConnectionResources; ///< Protects CUDT resources from simultaneous access, including creating and destroying dynamic objects. sync::Condition m_SendBlockCond; // used to block "send" call sync::Mutex m_SendBlockLock; // lock associated to m_SendBlockCond @@ -998,7 +998,7 @@ class CUDT // Failure to create the crypter means that an encrypted // connection should be rejected if ENFORCEDENCRYPTION is on. - SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock) + SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources) bool createCrypter(HandshakeSide side, bool bidi); private: // Generation and processing of packets diff --git a/srtcore/group.cpp b/srtcore/group.cpp index db50b3ec2..ea6df18ad 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -448,7 +448,7 @@ void CUDTGroup::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen) std::vector ps_vec; { // Do copy to avoid deadlock. CUDT::setOpt() cannot be called directly inside this loop, because - // CUDT::setOpt() will lock m_ConnectionLock, which should be locked before m_GroupLock. + // CUDT::setOpt() will lock m_ConnectionResources, which should be locked before m_GroupLock. ScopedLock gg(m_GroupLock); for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi) { diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 1e57b3021..e931cab65 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -994,7 +994,7 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst // because the next CUDT::close will not remove it from the queue when m_bConnecting = false, // and may crash on next pass. // - // TODO: maybe lock i->u->m_ConnectionLock? + // TODO: maybe lock i->u->m_ConnectionResources? i->u->m_bConnecting = false; remove(i->u->m_SocketID); diff --git a/test/test_socket_options.cpp b/test/test_socket_options.cpp index 15d3ffadb..48ada06a4 100644 --- a/test/test_socket_options.cpp +++ b/test/test_socket_options.cpp @@ -498,7 +498,7 @@ static const char* const socket_state_array[] = { const char* const* g_socket_state = socket_state_array + 1; #if 0 -// No socket option can be set in blocking mode because m_ConnectionLock is required by both srt_setsockopt and srt_connect +// No socket option can be set in blocking mode because m_ConnectionResources is required by both srt_setsockopt and srt_connect // TODO: Use non-blocking mode TEST_F(TestSocketOptions, RestrictionCallerConnecting) { From 559c49d3b1ab5de088497511045fef1ed8ca1331 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Wed, 20 Apr 2022 16:45:28 +0200 Subject: [PATCH 3/5] [core] Protect connection resources when inside CUDT::processCtrl(..) by acquiring the m_ConnectionResources. Fixes #2234. --- srtcore/core.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index c3020409a..33b5a9d1b 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -8922,6 +8922,7 @@ void srt::CUDT::processCtrlUserDefined(const CPacket& ctrlpkt) void srt::CUDT::processCtrl(const CPacket &ctrlpkt) { + CScopedResourceLock cg(m_ConnectionResources); // Just heard from the peer, reset the expiration count. m_iEXPCount = 1; const steady_clock::time_point currtime = steady_clock::now(); From 2ec15ac71d499187f483c16d7cb10e5c94e9ebb8 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Wed, 20 Apr 2022 17:41:07 +0200 Subject: [PATCH 4/5] Checking some build fixes --- srtcore/sync.h | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/srtcore/sync.h b/srtcore/sync.h index aae17a648..d6fe7bfd6 100644 --- a/srtcore/sync.h +++ b/srtcore/sync.h @@ -785,6 +785,11 @@ class CThread { return pthread_equal(value, second.value) != 0; } + + inline bool operator!=(const id& second) const + { + return pthread_equal(value, second.value) == 0; + } }; /// Returns the id of the current thread. @@ -960,7 +965,7 @@ class CScopedResourceTryLock m_rsrc.release(); } - explicit operator bool() const { return m_isAcquired; } + operator bool() const { return m_isAcquired; } bool operator!() const { return !m_isAcquired; } From dfe367ac334d27092be2cf8e944311508f78f809 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Thu, 21 Apr 2022 10:37:25 +0200 Subject: [PATCH 5/5] Can't lock the whole processCtrl because of the SHUTDOWN ctrl --- srtcore/core.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 33b5a9d1b..fcc368336 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -1963,6 +1963,7 @@ bool srt::CUDT::processSrtMsg(const CPacket *ctrlpkt) // and the appropriate message must be constructed for sending. // No further processing required { + CScopedResourceLock cg(m_ConnectionResources); uint32_t srtdata_out[SRTDATA_MAXSIZE]; size_t len_out = 0; res = m_pCryptoControl->processSrtMsg_KMREQ(srtdata, len, CUDT::HS_VERSION_UDT4, @@ -8922,7 +8923,6 @@ void srt::CUDT::processCtrlUserDefined(const CPacket& ctrlpkt) void srt::CUDT::processCtrl(const CPacket &ctrlpkt) { - CScopedResourceLock cg(m_ConnectionResources); // Just heard from the peer, reset the expiration count. m_iEXPCount = 1; const steady_clock::time_point currtime = steady_clock::now();