diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 98999a81f..7adc9aa47 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -1129,8 +1129,6 @@ srt::CRcvQueue::CRcvQueue() , m_iIPversion() , m_szPayloadSize() , m_bClosing(false) - , m_LSLock() - , m_pListener(NULL) , m_pRendezvousQueue(NULL) , m_vNewEntry() , m_IDLock() @@ -1404,11 +1402,13 @@ srt::EConnectStatus srt::CRcvQueue::worker_ProcessConnectionRequest(CUnit* unit, int listener_ret = SRT_REJ_UNKNOWN; bool have_listener = false; { - ScopedLock cg(m_LSLock); - if (m_pListener) + SharedLock shl(m_pListener); + CUDT* pListener = m_pListener.getPtrNoLock(); + + if (pListener) { - LOGC(cnlog.Debug, log << "PASSING request from: " << addr.str() << " to listener:" << m_pListener->socketID()); - listener_ret = m_pListener->processConnectRequest(addr, unit->m_Packet); + LOGC(cnlog.Debug, log << "PASSING request from: " << addr.str() << " to listener:" << pListener->socketID()); + listener_ret = pListener->processConnectRequest(addr, unit->m_Packet); // This function does return a code, but it's hard to say as to whether // anything can be done about it. In case when it's stated possible, the @@ -1690,21 +1690,15 @@ int srt::CRcvQueue::recvfrom(int32_t id, CPacket& w_packet) int srt::CRcvQueue::setListener(CUDT* u) { - ScopedLock lslock(m_LSLock); - - if (NULL != m_pListener) + if (!m_pListener.set(u)) return -1; - m_pListener = u; return 0; } void srt::CRcvQueue::removeListener(const CUDT* u) { - ScopedLock lslock(m_LSLock); - - if (u == m_pListener) - m_pListener = NULL; + m_pListener.clearIf(u); } void srt::CRcvQueue::registerConnector(const SRTSOCKET& id, diff --git a/srtcore/queue.h b/srtcore/queue.h index dd68a7721..43219736f 100644 --- a/srtcore/queue.h +++ b/srtcore/queue.h @@ -554,9 +554,8 @@ class CRcvQueue void storePktClone(int32_t id, const CPacket& pkt); private: - sync::Mutex m_LSLock; - CUDT* m_pListener; // pointer to the (unique, if any) listening UDT entity - CRendezvousQueue* m_pRendezvousQueue; // The list of sockets in rendezvous mode + sync::CSharedObjectPtr m_pListener; // pointer to the (unique, if any) listening UDT entity + CRendezvousQueue* m_pRendezvousQueue; // The list of sockets in rendezvous mode std::vector m_vNewEntry; // newly added entries, to be inserted sync::Mutex m_IDLock; diff --git a/srtcore/sync.h b/srtcore/sync.h index 8fee25831..e8b7444bd 100644 --- a/srtcore/sync.h +++ b/srtcore/sync.h @@ -343,15 +343,19 @@ class SRT_ATTR_CAPABILITY("mutex") Mutex pthread_mutex_t m_mutex; }; -/// A pthread version of std::chrono::scoped_lock (or lock_guard for C++11) +/// A pthread version of std::scoped_lock (or lock_guard for C++11). class SRT_ATTR_SCOPED_CAPABILITY ScopedLock { public: SRT_ATTR_ACQUIRE(m) - explicit ScopedLock(Mutex& m); + explicit ScopedLock(Mutex& m) + : m_mutex(m) + { + m_mutex.lock(); + } SRT_ATTR_RELEASE() - ~ScopedLock(); + ~ScopedLock() { m_mutex.unlock(); } private: Mutex& m_mutex; @@ -481,6 +485,122 @@ class Condition inline void setupCond(Condition& cv, const char*) { cv.init(); } inline void releaseCond(Condition& cv) { cv.destroy(); } +/////////////////////////////////////////////////////////////////////////////// +// +// Shared Mutex section +// +/////////////////////////////////////////////////////////////////////////////// + +/// Implementation of a read-write mutex. +/// This allows multiple readers at a time, or a single writer. +/// TODO: The class can be improved if needed to give writer a preference +/// by adding additional m_iWritersWaiting member variable (counter). +/// TODO: The m_iCountRead could be made atomic to make unlok_shared() faster and lock-free. +class SharedMutex +{ +public: + SharedMutex(); + ~SharedMutex(); + +public: + /// Acquire the lock for writting purposes. Only one thread can acquire this lock at a time + /// Once it is locked, no reader can acquire it + void lock(); + bool try_lock(); + void unlock(); + + /// Acquire the lock if no writter already has it. For read purpose only + /// Several readers can lock this at the same time. + void lock_shared(); + bool try_lock_shared(); + void unlock_shared(); + + int getReaderCount() const; + +protected: + Condition m_LockWriteCond; + Condition m_LockReadCond; + + mutable Mutex m_Mutex; + + int m_iCountRead; + bool m_bWriterLocked; +}; + +/// A version of std::scoped_lock (or lock_guard for C++11). +/// We could have used the srt::sync::ScopedLock making it a template-based class. +/// But in that case all usages would have to be specificed like ScopedLock in C++03. +class SRT_ATTR_SCOPED_CAPABILITY ExclusiveLock +{ +public: + SRT_ATTR_ACQUIRE(m) + explicit ExclusiveLock(SharedMutex& m) + : m_mutex(m) + { + m_mutex.lock(); + } + + SRT_ATTR_RELEASE() + ~ExclusiveLock() { m_mutex.unlock(); } + +private: + SharedMutex& m_mutex; +}; + +/// A reduced implementation of the std::shared_lock functionality (available in C++14). +class SRT_ATTR_SCOPED_CAPABILITY SharedLock +{ +public: + SRT_ATTR_ACQUIRE_SHARED(m) + explicit SharedLock(SharedMutex& m) + : m_mtx(m) + { + m_mtx.lock_shared(); + } + + SRT_ATTR_RELEASE_SHARED(m_mtx) + ~SharedLock() { m_mtx.unlock_shared(); } + +private: + SharedMutex& m_mtx; +}; + +/// A class template for a shared object. It is a wrapper around a pointer to an object +/// and a shared mutex. It allows multiple readers to access the object at the same time, +/// but only one writer can access the object at a time. +template +class CSharedObjectPtr : public SharedMutex +{ +public: + CSharedObjectPtr() + : m_pObj(NULL) + { + } + + bool set(T* pObj) + { + ExclusiveLock lock(*this); + if (m_pObj) + return false; + m_pObj = pObj; + return true; + } + + bool clearIf(const T* pObj) + { + ExclusiveLock lock(*this); + if (m_pObj != pObj) + return false; + m_pObj = NULL; + return true; + } + + T* getPtrNoLock() const { return m_pObj; } + +private: + T* m_pObj; +}; + /////////////////////////////////////////////////////////////////////////////// // // Event (CV) section @@ -943,43 +1063,6 @@ CUDTException& GetThreadLocalError(); /// @param[in] maxVal maximum allowed value of the resulting random number. int genRandomInt(int minVal, int maxVal); - -/// Implementation of a read-write mutex. -/// This allows multiple readers at a time, or a single writer. -/// TODO: The class can be improved if needed to give writer a preference -/// by adding additional m_iWritersWaiting member variable (counter). -/// TODO: The m_iCountRead could be made atomic to make unlok_shared() faster and lock-free. -class SharedMutex -{ -public: - SharedMutex(); - ~SharedMutex(); - -private: - Condition m_LockWriteCond; - Condition m_LockReadCond; - - mutable Mutex m_Mutex; - - int m_iCountRead; - bool m_bWriterLocked; - - /// Acquire the lock for writting purposes. Only one thread can acquire this lock at a time - /// Once it is locked, no reader can acquire it -public: - void lock(); - bool try_lock(); - void unlock(); - - /// Acquire the lock if no writter already has it. For read purpose only - /// Several readers can lock this at the same time. - void lock_shared(); - bool try_lock_shared(); - void unlock_shared(); - - int getReaderCount() const; -}; - } // namespace sync } // namespace srt diff --git a/srtcore/sync_posix.cpp b/srtcore/sync_posix.cpp index 8cb475ea7..8d7561a19 100644 --- a/srtcore/sync_posix.cpp +++ b/srtcore/sync_posix.cpp @@ -230,18 +230,6 @@ bool srt::sync::Mutex::try_lock() return (pthread_mutex_trylock(&m_mutex) == 0); } -srt::sync::ScopedLock::ScopedLock(Mutex& m) - : m_mutex(m) -{ - m_mutex.lock(); -} - -srt::sync::ScopedLock::~ScopedLock() -{ - m_mutex.unlock(); -} - - srt::sync::UniqueLock::UniqueLock(Mutex& m) : m_Mutex(m) {