From e20f90c3d6705947537b50270a1afdff025c5a37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Fri, 3 Jan 2020 17:31:08 +0100 Subject: [PATCH 1/6] Added CSync utlity for CV --- srtcore/cache.h | 10 +-- srtcore/common.cpp | 71 --------------- srtcore/common.h | 55 ------------ srtcore/core.cpp | 105 +++++++++++----------- srtcore/packetfilter.cpp | 1 + srtcore/queue.cpp | 28 +++--- srtcore/sync.cpp | 189 +++++++++++++++++++++++++++++++++++++-- srtcore/sync.h | 115 ++++++++++++++++++++++++ srtcore/window.h | 8 +- 9 files changed, 371 insertions(+), 211 deletions(-) diff --git a/srtcore/cache.h b/srtcore/cache.h index 346f9d813..141fe26c6 100644 --- a/srtcore/cache.h +++ b/srtcore/cache.h @@ -44,7 +44,7 @@ written by #include #include -#include "common.h" +#include "sync.h" #include "udt.h" class CCacheItem @@ -82,13 +82,13 @@ template class CCache m_iCurrSize(0) { m_vHashPtr.resize(m_iHashSize); - CGuard::createMutex(m_Lock); + srt::sync::CGuard::createMutex(m_Lock); } ~CCache() { clear(); - CGuard::releaseMutex(m_Lock); + srt::sync::CGuard::releaseMutex(m_Lock); } public: @@ -98,7 +98,7 @@ template class CCache int lookup(T* data) { - CGuard cacheguard(m_Lock); + srt::sync::CGuard cacheguard(m_Lock); int key = data->getKey(); if (key < 0) @@ -126,7 +126,7 @@ template class CCache int update(T* data) { - CGuard cacheguard(m_Lock); + srt::sync::CGuard cacheguard(m_Lock); int key = data->getKey(); if (key < 0) diff --git a/srtcore/common.cpp b/srtcore/common.cpp index c8485e797..d0d71d617 100644 --- a/srtcore/common.cpp +++ b/srtcore/common.cpp @@ -217,77 +217,6 @@ void CTimer::sleep() #endif } -int CTimer::condTimedWaitUS(pthread_cond_t* cond, pthread_mutex_t* mutex, uint64_t delay) { - timeval now; - gettimeofday(&now, 0); - const uint64_t time_us = now.tv_sec * uint64_t(1000000) + now.tv_usec + delay; - timespec timeout; - timeout.tv_sec = time_us / 1000000; - timeout.tv_nsec = (time_us % 1000000) * 1000; - - return pthread_cond_timedwait(cond, mutex, &timeout); -} - - -// Automatically lock in constructor -CGuard::CGuard(pthread_mutex_t& lock, bool shouldwork): - m_Mutex(lock), - m_iLocked(-1) -{ - if (shouldwork) - m_iLocked = pthread_mutex_lock(&m_Mutex); -} - -// Automatically unlock in destructor -CGuard::~CGuard() -{ - if (m_iLocked == 0) - pthread_mutex_unlock(&m_Mutex); -} - -// After calling this on a scoped lock wrapper (CGuard), -// the mutex will be unlocked right now, and no longer -// in destructor -void CGuard::forceUnlock() -{ - if (m_iLocked == 0) - { - pthread_mutex_unlock(&m_Mutex); - m_iLocked = -1; - } -} - -int CGuard::enterCS(pthread_mutex_t& lock) -{ - return pthread_mutex_lock(&lock); -} - -int CGuard::leaveCS(pthread_mutex_t& lock) -{ - return pthread_mutex_unlock(&lock); -} - -void CGuard::createMutex(pthread_mutex_t& lock) -{ - pthread_mutex_init(&lock, NULL); -} - -void CGuard::releaseMutex(pthread_mutex_t& lock) -{ - pthread_mutex_destroy(&lock); -} - -void CGuard::createCond(pthread_cond_t& cond) -{ - pthread_cond_init(&cond, NULL); -} - -void CGuard::releaseCond(pthread_cond_t& cond) -{ - pthread_cond_destroy(&cond); -} - -// CUDTException::CUDTException(CodeMajor major, CodeMinor minor, int err): m_iMajor(major), m_iMinor(minor) diff --git a/srtcore/common.h b/srtcore/common.h index efa7f43aa..505f9f18e 100644 --- a/srtcore/common.h +++ b/srtcore/common.h @@ -572,8 +572,6 @@ class CTimer /// @retval 0 Wait was successfull /// @retval ETIMEDOUT The wait timed out - static int condTimedWaitUS(pthread_cond_t* cond, pthread_mutex_t* mutex, uint64_t delay); - private: srt::sync::steady_clock::time_point m_tsSchedTime; // next schedulled time @@ -586,59 +584,6 @@ class CTimer //////////////////////////////////////////////////////////////////////////////// -class CGuard -{ -public: - /// Constructs CGuard, which locks the given mutex for - /// the scope where this object exists. - /// @param lock Mutex to lock - /// @param if_condition If this is false, CGuard will do completely nothing - CGuard(pthread_mutex_t& lock, bool if_condition = true); - ~CGuard(); - -public: - static int enterCS(pthread_mutex_t& lock); - static int leaveCS(pthread_mutex_t& lock); - - static void createMutex(pthread_mutex_t& lock); - static void releaseMutex(pthread_mutex_t& lock); - - static void createCond(pthread_cond_t& cond); - static void releaseCond(pthread_cond_t& cond); - - void forceUnlock(); - -private: - pthread_mutex_t& m_Mutex; // Alias name of the mutex to be protected - int m_iLocked; // Locking status - - CGuard& operator=(const CGuard&); -}; - -class InvertedGuard -{ - pthread_mutex_t* m_pMutex; -public: - - InvertedGuard(pthread_mutex_t* smutex): m_pMutex(smutex) - { - if ( !smutex ) - return; - - CGuard::leaveCS(*smutex); - } - - ~InvertedGuard() - { - if ( !m_pMutex ) - return; - - CGuard::enterCS(*m_pMutex); - } -}; - -//////////////////////////////////////////////////////////////////////////////// - // UDT Sequence Number 0 - (2^31 - 1) // seqcmp: compare two seq#, considering the wraping diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 8123fdce2..66ff39ec3 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -4717,7 +4717,10 @@ void *CUDT::tsbpd(void *param) THREAD_STATE_INIT("SRT:TsbPd"); - CGuard::enterCS(self->m_RecvLock); + CGuard recv_gl (self->m_RecvLock); + CSync recvdata_cc (self->m_RecvDataCond, recv_gl); + CSync tsbpd_cc (self->m_RcvTsbPdCond, recv_gl); + self->m_bTsbPdAckWakeup = true; while (!self->m_bClosing) { @@ -4819,7 +4822,7 @@ void *CUDT::tsbpd(void *param) */ if (self->m_bSynRecving) { - pthread_cond_signal(&self->m_RecvDataCond); + recvdata_cc.signal_locked(recv_gl); } /* * Set EPOLL_IN to wakeup any thread waiting on epoll @@ -4837,12 +4840,10 @@ void *CUDT::tsbpd(void *param) * Schedule wakeup when it will be. */ self->m_bTsbPdAckWakeup = false; - THREAD_PAUSED(); HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << current_pkt_seq << " T=" << FormatTime(tsbpdtime) << " - waiting " << count_milliseconds(timediff) << "ms"); - SyncEvent::wait_for(&self->m_RcvTsbPdCond, &self->m_RecvLock, timediff); - THREAD_RESUMED(); + tsbpd_cc.wait_for(timediff); } else { @@ -4859,12 +4860,10 @@ void *CUDT::tsbpd(void *param) */ HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack"); self->m_bTsbPdAckWakeup = true; - THREAD_PAUSED(); - pthread_cond_wait(&self->m_RcvTsbPdCond, &self->m_RecvLock); - THREAD_RESUMED(); + tsbpd_cc.wait(); } } - CGuard::leaveCS(self->m_RecvLock); + // m_RecvLock will be unlocked in ~CGuard. THREAD_EXIT(); HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING"); return NULL; @@ -5476,6 +5475,8 @@ int CUDT::receiveBuffer(char *data, int len) throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); } + CSync rcond (m_RecvDataCond, recvguard); + CSync tscond (m_RcvTsbPdCond, recvguard); if (!m_pRcvBuffer->isRcvDataReady()) { if (!m_bSynRecving) @@ -5490,7 +5491,7 @@ int CUDT::receiveBuffer(char *data, int len) while (stillConnected() && !m_pRcvBuffer->isRcvDataReady()) { // Do not block forever, check connection status each 1 sec. - SyncEvent::wait_for(&m_RecvDataCond, &m_RecvLock, seconds_from(1)); + rcond.wait_for(seconds_from(1)); } } else @@ -5498,7 +5499,7 @@ int CUDT::receiveBuffer(char *data, int len) const steady_clock::time_point exptime = steady_clock::now() + milliseconds_from(m_iRcvTimeOut); while (stillConnected() && !m_pRcvBuffer->isRcvDataReady()) { - SyncEvent::wait_for(&m_RecvDataCond, &m_RecvLock, milliseconds_from(m_iRcvTimeOut)); + rcond.wait_for(milliseconds_from(m_iRcvTimeOut)); if (steady_clock::now() >= exptime) break; } @@ -5531,7 +5532,7 @@ int CUDT::receiveBuffer(char *data, int len) if (m_bTsbPd) { HLOGP(tslog.Debug, "Ping TSBPD thread to schedule wakeup"); - pthread_cond_signal(&m_RcvTsbPdCond); + tscond.signal_locked(recvguard); } if (!m_pRcvBuffer->isRcvDataReady()) @@ -5735,12 +5736,13 @@ int CUDT::sendmsg2(const char *data, int len, ref_t r_mctrl) { // wait here during a blocking sending - CGuard sendblock_lock(m_SendBlockLock); + CGuard sendblock_lock (m_SendBlockLock); + CSync sendcond (m_SendBlockCond, sendblock_lock); if (m_iSndTimeOut < 0) { while (stillConnected() && sndBuffersLeft() < minlen && m_bPeerHealth) - pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); + sendcond.wait(); } else { @@ -5748,7 +5750,7 @@ int CUDT::sendmsg2(const char *data, int len, ref_t r_mctrl) while (stillConnected() && sndBuffersLeft() < minlen && m_bPeerHealth && exptime > steady_clock::now()) { - SyncEvent::wait_for(&m_SendBlockCond, &m_SendBlockLock, milliseconds_from(m_iSndTimeOut)); + sendcond.wait_for(milliseconds_from(m_iSndTimeOut)); } } } @@ -5908,7 +5910,8 @@ int CUDT::receiveMessage(char *data, int len, ref_t r_mctrl) if (!m_CongCtl->checkTransArgs(SrtCongestion::STA_MESSAGE, SrtCongestion::STAD_RECV, data, len, -1, false)) throw CUDTException(MJ_NOTSUP, MN_INVALMSGAPI, 0); - CGuard recvguard(m_RecvLock); + CGuard recvguard (m_RecvLock); + CSync tscond (m_RcvTsbPdCond, recvguard); /* XXX DEBUG STUFF - enable when required char charbool[2] = {'0', '1'}; @@ -5930,7 +5933,7 @@ int CUDT::receiveMessage(char *data, int len, ref_t r_mctrl) /* Kick TsbPd thread to schedule next wakeup (if running) */ if (m_bTsbPd) - pthread_cond_signal(&m_RcvTsbPdCond); + tscond.signal_locked(recvguard); if (!m_pRcvBuffer->isRcvDataReady()) { @@ -5958,7 +5961,7 @@ int CUDT::receiveMessage(char *data, int len, ref_t r_mctrl) // Kick TsbPd thread to schedule next wakeup (if running) if (m_bTsbPd) - pthread_cond_signal(&m_RcvTsbPdCond); + tscond.signal_locked(recvguard); // Shut up EPoll if no more messages in non-blocking mode s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false); @@ -5970,7 +5973,7 @@ int CUDT::receiveMessage(char *data, int len, ref_t r_mctrl) { // Kick TsbPd thread to schedule next wakeup (if running) if (m_bTsbPd) - pthread_cond_signal(&m_RcvTsbPdCond); + tscond.signal_locked(recvguard); // Shut up EPoll if no more messages in non-blocking mode s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false); @@ -5990,6 +5993,8 @@ int CUDT::receiveMessage(char *data, int len, ref_t r_mctrl) // Do not block forever, check connection status each 1 sec. const steady_clock::duration recv_timeout = m_iRcvTimeOut < 0 ? seconds_from(1) : milliseconds_from(m_iRcvTimeOut); + CSync recv_cond (m_RecvDataCond, recvguard); + do { if (stillConnected() && !timeout && (!m_pRcvBuffer->isRcvDataReady())) @@ -5997,13 +6002,13 @@ int CUDT::receiveMessage(char *data, int len, ref_t r_mctrl) /* Kick TsbPd thread to schedule next wakeup (if running) */ if (m_bTsbPd) { - HLOGP(tslog.Debug, "recvmsg: KICK tsbpd()"); - pthread_cond_signal(&m_RcvTsbPdCond); + HLOGP(tslog.Debug, "receiveMessage: KICK tsbpd"); + tscond.signal_locked(recvguard); } do { - if (SyncEvent::wait_for(&m_RecvDataCond, &m_RecvLock, recv_timeout) == ETIMEDOUT) + if (!recv_cond.wait_for(recv_timeout)) { if (!(m_iRcvTimeOut < 0)) timeout = true; @@ -6047,7 +6052,7 @@ int CUDT::receiveMessage(char *data, int len, ref_t r_mctrl) if (m_bTsbPd) { HLOGP(tslog.Debug, "recvmsg: KICK tsbpd() (buffer empty)"); - pthread_cond_signal(&m_RcvTsbPdCond); + tscond.signal_locked(recvguard); } // Shut up EPoll if no more messages in non-blocking mode @@ -6135,10 +6140,11 @@ int64_t CUDT::sendfile(fstream &ifs, int64_t &offset, int64_t size, int block) unitsize = int((tosend >= block) ? block : tosend); { - CGuard lk(m_SendBlockLock); + CGuard lk (m_SendBlockLock); + CSync sendcond (m_SendBlockCond, lk); while (stillConnected() && (sndBuffersLeft() <= 0) && m_bPeerHealth) - pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); + sendcond.wait(); } if (m_bBroken || m_bClosing) @@ -6265,10 +6271,13 @@ int64_t CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int block) throw CUDTException(MJ_FILESYSTEM, MN_WRITEFAIL); } - pthread_mutex_lock(&m_RecvDataLock); - while (stillConnected() && !m_pRcvBuffer->isRcvDataReady()) - pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock); - pthread_mutex_unlock(&m_RecvDataLock); + { + CGuard gl (m_RecvDataLock); + CSync rcond (m_RecvDataCond, gl); + + while (stillConnected() && !m_pRcvBuffer->isRcvDataReady()) + rcond.wait(); + } if (!m_bConnected) throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); @@ -6664,20 +6673,14 @@ void CUDT::destroySynch() void CUDT::releaseSynch() { // wake up user calls - pthread_mutex_lock(&m_SendBlockLock); - pthread_cond_signal(&m_SendBlockCond); - pthread_mutex_unlock(&m_SendBlockLock); + CSync::lock_signal(m_SendBlockCond, m_SendBlockLock); pthread_mutex_lock(&m_SendLock); pthread_mutex_unlock(&m_SendLock); - pthread_mutex_lock(&m_RecvDataLock); - pthread_cond_signal(&m_RecvDataCond); - pthread_mutex_unlock(&m_RecvDataLock); + CSync::lock_signal(m_RecvDataCond, m_RecvDataLock); - pthread_mutex_lock(&m_RecvLock); - pthread_cond_signal(&m_RcvTsbPdCond); - pthread_mutex_unlock(&m_RecvLock); + CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock); pthread_mutex_lock(&m_RecvDataLock); if (!pthread_equal(m_RcvTsbPdThread, pthread_t())) @@ -6804,19 +6807,17 @@ void CUDT::sendCtrl(UDTMessageType pkttype, const void *lparam, void *rparam, in if (m_bTsbPd) { /* Newly acknowledged data, signal TsbPD thread */ - pthread_mutex_lock(&m_RecvLock); + CGuard rlock (m_RecvLock); + CSync cc (m_RcvTsbPdCond, rlock); if (m_bTsbPdAckWakeup) - pthread_cond_signal(&m_RcvTsbPdCond); - pthread_mutex_unlock(&m_RecvLock); + cc.signal_locked(rlock); } else { if (m_bSynRecving) { // signal a waiting "recv" call if there is any data available - pthread_mutex_lock(&m_RecvDataLock); - pthread_cond_signal(&m_RecvDataCond); - pthread_mutex_unlock(&m_RecvDataLock); + CSync::lock_signal(m_RecvDataCond, m_RecvDataLock); } // acknowledge any waiting epolls to read s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, true); @@ -7064,8 +7065,7 @@ void CUDT::updateSndLossListOnACK(int32_t ackdata_seqno) if (m_bSynSending) { - CGuard lk(m_SendBlockLock); - pthread_cond_signal(&m_SendBlockCond); + CSync::lock_signal(m_SendBlockCond, m_SendBlockLock); } const steady_clock::time_point currtime = steady_clock::now(); @@ -7991,8 +7991,7 @@ void CUDT::processClose() if (m_bTsbPd) { HLOGP(mglog.Debug, "processClose: lock-and-signal TSBPD"); - CGuard rl(m_RecvLock); - pthread_cond_signal(&m_RcvTsbPdCond); + CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock); } // Signal the sender and recver if they are waiting for data. @@ -8427,9 +8426,8 @@ int CUDT::processData(CUnit *in_unit) if (m_bTsbPd) { - pthread_mutex_lock(&m_RecvLock); - pthread_cond_signal(&m_RcvTsbPdCond); - pthread_mutex_unlock(&m_RecvLock); + HLOGC(mglog.Debug, log << "loss: signaling TSBPD cond"); + CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock); } } @@ -8445,9 +8443,8 @@ int CUDT::processData(CUnit *in_unit) if (m_bTsbPd) { - pthread_mutex_lock(&m_RecvLock); - pthread_cond_signal(&m_RcvTsbPdCond); - pthread_mutex_unlock(&m_RecvLock); + HLOGC(mglog.Debug, log << "loss: signaling TSBPD cond"); + CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock); } } diff --git a/srtcore/packetfilter.cpp b/srtcore/packetfilter.cpp index 99dd4de78..2e4170d1d 100644 --- a/srtcore/packetfilter.cpp +++ b/srtcore/packetfilter.cpp @@ -23,6 +23,7 @@ using namespace std; using namespace srt_logging; +using namespace srt::sync; bool ParseFilterConfig(std::string s, SrtFilterConfig& out) { diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index d53338f36..8b695318e 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -419,9 +419,7 @@ void CSndUList::insert_norealloc_(const steady_clock::time_point& ts, const CUDT // first entry, activate the sending queue if (0 == m_iLastEntry) { - pthread_mutex_lock(m_pWindowLock); - pthread_cond_signal(m_pWindowCond); - pthread_mutex_unlock(m_pWindowLock); + CSync::lock_signal(*m_pWindowCond, *m_pWindowLock); } } @@ -487,9 +485,8 @@ CSndQueue::~CSndQueue() m_pTimer->interrupt(); } - pthread_mutex_lock(&m_WindowLock); - pthread_cond_signal(&m_WindowCond); - pthread_mutex_unlock(&m_WindowLock); + CSync::lock_signal(m_WindowCond, m_WindowLock); + if (!pthread_equal(m_WorkerThread, pthread_t())) pthread_join(m_WorkerThread, NULL); pthread_cond_destroy(&m_WindowCond); @@ -547,19 +544,18 @@ void *CSndQueue::worker(void *param) self->m_WorkerStats.lNotReadyTs++; #endif /* SRT_DEBUG_SNDQ_HIGHRATE */ + CGuard windlock (self->m_WindowLock); + CSync windsync (self->m_WindowCond, windlock); + // wait here if there is no sockets with data to be sent - THREAD_PAUSED(); - pthread_mutex_lock(&self->m_WindowLock); if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0)) { - pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock); + windsync.wait(); #if defined(SRT_DEBUG_SNDQ_HIGHRATE) self->m_WorkerStats.lCondWait++; #endif /* SRT_DEBUG_SNDQ_HIGHRATE */ } - THREAD_RESUMED(); - pthread_mutex_unlock(&self->m_WindowLock); continue; } @@ -1543,14 +1539,15 @@ EConnectStatus CRcvQueue::worker_TryAsyncRend_OrStore(int32_t id, CUnit *unit, c int CRcvQueue::recvfrom(int32_t id, ref_t r_packet) { - CGuard bufferlock(m_PassLock); + CGuard bufferlock (m_PassLock); + CSync passcond (m_PassCond, bufferlock); CPacket &packet = *r_packet; map >::iterator i = m_mBuffer.find(id); if (i == m_mBuffer.end()) { - SyncEvent::wait_for(&m_PassCond, &m_PassLock, seconds_from(1)); + passcond.wait_for(seconds_from(1)); i = m_mBuffer.find(id); if (i == m_mBuffer.end()) @@ -1665,14 +1662,15 @@ CUDT *CRcvQueue::getNewEntry() void CRcvQueue::storePkt(int32_t id, CPacket *pkt) { - CGuard bufferlock(m_PassLock); + CGuard bufferlock (m_PassLock); + CSync passcond (m_PassCond, bufferlock); map >::iterator i = m_mBuffer.find(id); if (i == m_mBuffer.end()) { m_mBuffer[id].push(pkt); - pthread_cond_signal(&m_PassCond); + passcond.signal_locked(bufferlock); } else { diff --git a/srtcore/sync.cpp b/srtcore/sync.cpp index 6d76cead4..52b700fb5 100644 --- a/srtcore/sync.cpp +++ b/srtcore/sync.cpp @@ -98,6 +98,176 @@ int64_t get_cpu_frequency() const int64_t s_cpu_frequency = get_cpu_frequency(); + +// Automatically lock in constructor +CGuard::CGuard(pthread_mutex_t& lock, bool shouldwork): + m_Mutex(lock), + m_iLocked(-1) +{ + if (shouldwork) + m_iLocked = pthread_mutex_lock(&m_Mutex); +} + +// Automatically unlock in destructor +CGuard::~CGuard() +{ + if (m_iLocked == 0) + pthread_mutex_unlock(&m_Mutex); +} + +// After calling this on a scoped lock wrapper (CGuard), +// the mutex will be unlocked right now, and no longer +// in destructor +void CGuard::forceUnlock() +{ + if (m_iLocked == 0) + { + pthread_mutex_unlock(&m_Mutex); + m_iLocked = -1; + } +} + +int CGuard::enterCS(pthread_mutex_t& lock) +{ + return pthread_mutex_lock(&lock); +} + +int CGuard::leaveCS(pthread_mutex_t& lock) +{ + return pthread_mutex_unlock(&lock); +} + +void CGuard::createMutex(pthread_mutex_t& lock) +{ + pthread_mutex_init(&lock, NULL); +} + +void CGuard::releaseMutex(pthread_mutex_t& lock) +{ + pthread_mutex_destroy(&lock); +} + +void CGuard::createCond(pthread_cond_t& cond) +{ + pthread_cond_init(&cond, NULL); +} + +void CGuard::releaseCond(pthread_cond_t& cond) +{ + pthread_cond_destroy(&cond); +} + + + +CSync::CSync(pthread_cond_t& cond, CGuard& g) + : m_cond(&cond), m_mutex(&g.m_Mutex) +{ + // XXX it would be nice to check whether the owner is also current thread + // but this can't be done portable way. + + // When constructed by this constructor, the user is expected + // to only call signal_locked() function. You should pass the same guard + // variable that you have used for construction as its argument. +} + +CSync::CSync(pthread_cond_t& cond, pthread_mutex_t& mutex, Nolock) + : m_cond(&cond) + , m_mutex(&mutex) +{ + // We expect that the mutex is NOT locked at this moment by the current thread, + // but it is perfectly ok, if the mutex is locked by another thread. We'll just wait. + + // When constructed by this constructor, the user is expected + // to only call lock_signal() function. +} + +void CSync::wait() +{ + THREAD_PAUSED(); + pthread_cond_wait(&(*m_cond), &(*m_mutex)); + THREAD_RESUMED(); +} + +bool CSync::wait_until(const steady_clock::time_point& exptime) +{ + // This will work regardless as to which clock is in use. The time + // should be specified as steady_clock::time_point, so there's no + // question of the timer base. + steady_clock::time_point now = steady_clock::now(); + if (now >= exptime) + return false; // timeout + + THREAD_PAUSED(); + bool signaled = SyncEvent::wait_for(m_cond, m_mutex, exptime - now) != ETIMEDOUT; + THREAD_RESUMED(); + + return signaled; +} + +/// Block the call until either @a timestamp time achieved +/// or the conditional is signaled. +/// @param [in] delay Maximum time to wait since the moment of the call +/// @retval true Resumed due to getting a CV signal +/// @retval false Resumed due to being past @a timestamp +bool CSync::wait_for(const steady_clock::duration& delay) +{ + // Note: this is implemented this way because the pthread API + // does not provide a possibility to wait relative time. When + // you implement it for different API that does provide relative + /// time waiting, you may want to implement it better way. + + THREAD_PAUSED(); + bool signaled = SyncEvent::wait_for(m_cond, m_mutex, delay) != ETIMEDOUT; + THREAD_RESUMED(); + + return signaled; +} + +void CSync::lock_signal() +{ + // We expect m_nolock == true. + lock_signal(*m_cond, *m_mutex); +} + +void CSync::lock_signal(pthread_cond_t& cond, pthread_mutex_t& mutex) +{ + // Not using CGuard here because it would be logged + // and this will result in unnecessary excessive logging. + pthread_mutex_lock(&(mutex)); + pthread_cond_signal(&(cond)); + pthread_mutex_unlock(&(mutex)); +} + +void CSync::lock_broadcast(pthread_cond_t& cond, pthread_mutex_t& mutex) +{ + // Not using CGuard here because it would be logged + // and this will result in unnecessary excessive logging. + pthread_mutex_lock(&(mutex)); + pthread_cond_broadcast(&(cond)); + pthread_mutex_unlock(&(mutex)); +} + +void CSync::signal_locked(CGuard& lk SRT_ATR_UNUSED) +{ + // We expect m_nolock == false. + pthread_cond_signal(&(*m_cond)); +} + +void CSync::signal_relaxed() +{ + signal_relaxed(*m_cond); +} + +void CSync::signal_relaxed(pthread_cond_t& cond) +{ + pthread_cond_signal(&(cond)); +} + +void CSync::broadcast_relaxed(pthread_cond_t& cond) +{ + pthread_cond_broadcast(&(cond)); +} + } // namespace sync } // namespace srt @@ -107,6 +277,14 @@ uint64_t srt::sync::TimePoint::us_since_epoch() const return m_timestamp / s_cpu_frequency; } +timespec srt::sync::us_to_timespec(const uint64_t time_us) +{ + timespec timeout; + timeout.tv_sec = time_us / 1000000; + timeout.tv_nsec = (time_us % 1000000) * 1000; + return timeout; +} + template <> srt::sync::Duration srt::sync::TimePoint::time_since_epoch() const { @@ -194,9 +372,8 @@ int srt::sync::SyncEvent::wait_for(pthread_cond_t* cond, pthread_mutex_t* mutex, timespec timeout; timeval now; gettimeofday(&now, 0); - const uint64_t time_us = now.tv_sec * uint64_t(1000000) + now.tv_usec + count_microseconds(rel_time); - timeout.tv_sec = time_us / 1000000; - timeout.tv_nsec = (time_us % 1000000) * 1000; + const uint64_t now_us = now.tv_sec * uint64_t(1000000) + now.tv_usec; + timeout = us_to_timespec(now_us + count_microseconds(rel_time)); return pthread_cond_timedwait(cond, mutex, &timeout); } @@ -206,10 +383,8 @@ int srt::sync::SyncEvent::wait_for_monotonic(pthread_cond_t* cond, pthread_mutex { timespec timeout; clock_gettime(CLOCK_MONOTONIC, &timeout); - const uint64_t time_us = - timeout.tv_sec * uint64_t(1000000) + (timeout.tv_nsec / 1000) + count_microseconds(rel_time); - timeout.tv_sec = time_us / 1000000; - timeout.tv_nsec = (time_us % 1000000) * 1000; + const uint64_t now_us = timeout.tv_sec * uint64_t(1000000) + (timeout.tv_nsec / 1000); + timeout = us_to_timespec(now_us + count_microseconds(rel_time)); return pthread_cond_timedwait(cond, mutex, &timeout); } diff --git a/srtcore/sync.h b/srtcore/sync.h index 7dcc3d47f..9351a29aa 100644 --- a/srtcore/sync.h +++ b/srtcore/sync.h @@ -174,6 +174,9 @@ Duration seconds_from(int64_t t_s); inline bool is_zero(const TimePoint& t) { return t.is_zero(); } +timespec us_to_timespec(const uint64_t time_us); + + /////////////////////////////////////////////////////////////////////////////// // // Common pthread/chrono section @@ -197,6 +200,118 @@ class SyncEvent static int wait_for_monotonic(pthread_cond_t* cond, pthread_mutex_t* mutex, const steady_clock::duration& rel_time); }; + +class CGuard +{ +public: + /// Constructs CGuard, which locks the given mutex for + /// the scope where this object exists. + /// @param lock Mutex to lock + /// @param if_condition If this is false, CGuard will do completely nothing + CGuard(pthread_mutex_t& lock, bool if_condition = true); + ~CGuard(); + +public: + static int enterCS(pthread_mutex_t& lock); + static int leaveCS(pthread_mutex_t& lock); + + static void createMutex(pthread_mutex_t& lock); + static void releaseMutex(pthread_mutex_t& lock); + + static void createCond(pthread_cond_t& cond); + static void releaseCond(pthread_cond_t& cond); + + void forceUnlock(); + +private: + friend class CSync; + + pthread_mutex_t& m_Mutex; // Alias name of the mutex to be protected + int m_iLocked; // Locking status + + CGuard& operator=(const CGuard&); +}; + +class InvertedGuard +{ + pthread_mutex_t* m_pMutex; +public: + + InvertedGuard(pthread_mutex_t* smutex): m_pMutex(smutex) + { + if ( !smutex ) + return; + + CGuard::leaveCS(*smutex); + } + + ~InvertedGuard() + { + if ( !m_pMutex ) + return; + + CGuard::enterCS(*m_pMutex); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +// This class is used for condition variable combined with mutex by different ways. +// This should provide a cleaner API around locking with debug-logging inside. +class CSync +{ + pthread_cond_t* m_cond; + pthread_mutex_t* m_mutex; + +public: + enum Nolock { NOLOCK }; + + // Locked version: must be declared only after the declaration of CGuard, + // which has locked the mutex. On this delegate you should call only + // signal_locked() and pass the CGuard variable that should remain locked. + // Also wait() and wait_for() can be used only with this socket. + CSync(pthread_cond_t& cond, CGuard& g); + + // This is only for one-shot signaling. This doesn't need a CGuard + // variable, only the mutex itself. Only lock_signal() can be used. + CSync(pthread_cond_t& cond, pthread_mutex_t& mutex, Nolock); + + // An alternative method + static CSync nolock(pthread_cond_t& cond, pthread_mutex_t& m) + { + return CSync(cond, m, NOLOCK); + } + + // COPY CONSTRUCTOR: DEFAULT! + + // Wait indefinitely, until getting a signal on CV. + void wait(); + + // Wait only for a given time delay (in microseconds). This function + // extracts first current time using steady_clock::now(). + bool wait_for(const steady_clock::duration& delay); + + // Wait until the given time is achieved. This actually + // refers to wait_for for the time remaining to achieve + // given time. + bool wait_until(const steady_clock::time_point& exptime); + + // You can signal using two methods: + // - lock_signal: expect the mutex NOT locked, lock it, signal, then unlock. + // - signal: expect the mutex locked, so only issue a signal, but you must pass the CGuard that keeps the lock. + void lock_signal(); + + // Static ad-hoc version + static void lock_signal(pthread_cond_t& cond, pthread_mutex_t& m); + static void lock_broadcast(pthread_cond_t& cond, pthread_mutex_t& m); + + void signal_locked(CGuard& lk); + void signal_relaxed(); + static void signal_relaxed(pthread_cond_t& cond); + static void broadcast_relaxed(pthread_cond_t& cond); +}; + + /// Print steady clock timepoint in a human readable way. /// days HH:MM::SS.us [STD] /// Example: 1D 02:12:56.123456 diff --git a/srtcore/window.h b/srtcore/window.h index a4f42f669..a1c2b71ec 100644 --- a/srtcore/window.h +++ b/srtcore/window.h @@ -170,7 +170,7 @@ class CPktTimeWindow: CPktTimeWindowTools int getPktRcvSpeed(ref_t bytesps) const { // Lock access to the packet Window - CGuard cg(m_lockPktWindow); + srt::sync::CGuard cg(m_lockPktWindow); int pktReplica[ASIZE]; // packet information window (inter-packet time) return getPktRcvSpeed_in(m_aPktWindow, pktReplica, m_aBytesWindow, ASIZE, *bytesps); @@ -188,7 +188,7 @@ class CPktTimeWindow: CPktTimeWindowTools int getBandwidth() const { // Lock access to the packet Window - CGuard cg(m_lockProbeWindow); + srt::sync::CGuard cg(m_lockProbeWindow); int probeReplica[PSIZE]; return getBandwidth_in(m_aProbeWindow, probeReplica, PSIZE); @@ -211,7 +211,7 @@ class CPktTimeWindow: CPktTimeWindowTools void onPktArrival(int pktsz = 0) { - CGuard cg(m_lockPktWindow); + srt::sync::CGuard cg(m_lockPktWindow); m_tsCurrArrTime = srt::sync::steady_clock::now(); @@ -286,7 +286,7 @@ class CPktTimeWindow: CPktTimeWindowTools const srt::sync::steady_clock::time_point now = srt::sync::steady_clock::now(); // Lock access to the packet Window - CGuard cg(m_lockProbeWindow); + srt::sync::CGuard cg(m_lockProbeWindow); m_tsCurrArrTime = now; From 59cc73bfc74fa658db5cefcfe7fc7f2b88f72163 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 8 Jan 2020 19:32:26 +0100 Subject: [PATCH 2/6] Extracted part for thread wrappers --- srtcore/api.cpp | 99 +++++++++++----------- srtcore/api.h | 18 ++-- srtcore/buffer.cpp | 10 +-- srtcore/buffer.h | 6 +- srtcore/cache.h | 10 +-- srtcore/core.cpp | 202 ++++++++++++++++++++++---------------------- srtcore/core.h | 24 +++--- srtcore/epoll.cpp | 4 +- srtcore/epoll.h | 4 +- srtcore/list.cpp | 4 +- srtcore/list.h | 2 +- srtcore/queue.cpp | 41 ++++----- srtcore/queue.h | 20 ++--- srtcore/sync.cpp | 140 +++++++++++++++++++++--------- srtcore/sync.h | 133 +++++++++++++++++++++-------- srtcore/utilities.h | 16 ++++ srtcore/window.h | 12 +-- test/test_sync.cpp | 126 ++++++++++++--------------- 18 files changed, 494 insertions(+), 377 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index bbe97972d..620bfa44d 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -97,9 +97,9 @@ m_AcceptLock(), m_uiBackLog(0), m_iMuxID(-1) { - pthread_mutex_init(&m_AcceptLock, NULL); - pthread_cond_init(&m_AcceptCond, NULL); - pthread_mutex_init(&m_ControlLock, NULL); + createMutex(m_AcceptLock, "Accept"); + createCond(m_AcceptCond, "Accept"); + createMutex(m_ControlLock, "Control"); } CUDTSocket::~CUDTSocket() @@ -121,9 +121,9 @@ CUDTSocket::~CUDTSocket() delete m_pQueuedSockets; delete m_pAcceptSockets; - pthread_mutex_destroy(&m_AcceptLock); - pthread_cond_destroy(&m_AcceptCond); - pthread_mutex_destroy(&m_ControlLock); + releaseMutex(m_AcceptLock); + releaseCond(m_AcceptCond); + releaseMutex(m_ControlLock); } //////////////////////////////////////////////////////////////////////////////// @@ -156,9 +156,9 @@ m_ClosedSockets() srand((unsigned int)t.tv_usec); m_SocketIDGenerator = 1 + (int)((1 << 30) * (double(rand()) / RAND_MAX)); - pthread_mutex_init(&m_GlobControlLock, NULL); - pthread_mutex_init(&m_IDLock, NULL); - pthread_mutex_init(&m_InitLock, NULL); + createMutex(m_GlobControlLock, "GlobControl"); + createMutex(m_IDLock, "ID"); + createMutex(m_InitLock, "Init"); pthread_key_create(&m_TLSError, TLSDestroy); @@ -175,9 +175,9 @@ CUDTUnited::~CUDTUnited() cleanup(); } - pthread_mutex_destroy(&m_GlobControlLock); - pthread_mutex_destroy(&m_IDLock); - pthread_mutex_destroy(&m_InitLock); + releaseMutex(m_GlobControlLock); + releaseMutex(m_IDLock); + releaseMutex(m_InitLock); delete (CUDTException*)pthread_getspecific(m_TLSError); pthread_key_delete(m_TLSError); @@ -220,15 +220,8 @@ int CUDTUnited::startup() return true; m_bClosing = false; - pthread_mutex_init(&m_GCStopLock, NULL); -#if ENABLE_MONOTONIC_CLOCK - pthread_condattr_t CondAttribs; - pthread_condattr_init(&CondAttribs); - pthread_condattr_setclock(&CondAttribs, CLOCK_MONOTONIC); - pthread_cond_init(&m_GCStopCond, &CondAttribs); -#else - pthread_cond_init(&m_GCStopCond, NULL); -#endif + createMutex(m_GCStopLock, "GCStop"); + createCond_monotonic(m_GCStopCond, "GCStop"); { ThreadName tn("SRT:GC"); pthread_create(&m_GCThread, NULL, garbageCollect, this); @@ -252,8 +245,13 @@ int CUDTUnited::cleanup() return 0; m_bClosing = true; - pthread_cond_signal(&m_GCStopCond); - pthread_join(m_GCThread, NULL); + // NOTE: we can do relaxed signaling here because + // waiting on m_GCStopCond has a 1-second timeout, + // after which the m_bClosing flag is cheched, which + // is set here above. Worst case secenario, this + // jointhread() call will block for 1 second. + CSync::signal_relaxed(m_GCStopCond); + jointhread(m_GCThread); // XXX There's some weird bug here causing this // to hangup on Windows. This might be either something @@ -262,8 +260,8 @@ int CUDTUnited::cleanup() // tolerated with simply exit the application without cleanup, // counting on that the system will take care of it anyway. #ifndef _WIN32 - pthread_mutex_destroy(&m_GCStopLock); - pthread_cond_destroy(&m_GCStopCond); + releaseMutex(m_GCStopLock); + releaseCond(m_GCStopCond); #endif m_bGCStatus = false; @@ -535,7 +533,7 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr* peer, CHan SRTSOCKET id = ns->m_SocketID; ns->m_pUDT->close(); ns->m_Status = SRTS_CLOSED; - ns->m_tsClosureTimeStamp = srt::sync::steady_clock::now(); + ns->m_tsClosureTimeStamp = steady_clock::now(); // The mapped socket should be now unmapped to preserve the situation that // was in the original UDT code. // In SRT additionally the acceptAndRespond() function (it was called probably @@ -551,9 +549,7 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr* peer, CHan } // wake up a waiting accept() call - pthread_mutex_lock(&(ls->m_AcceptLock)); - pthread_cond_signal(&(ls->m_AcceptCond)); - pthread_mutex_unlock(&(ls->m_AcceptLock)); + CSync::lock_signal(ls->m_AcceptCond, ls->m_AcceptLock); return 1; } @@ -778,6 +774,7 @@ SRTSOCKET CUDTUnited::accept(const SRTSOCKET listen, sockaddr* addr, int* addrle while (!accepted) { CGuard cg(ls->m_AcceptLock); + CSync axcond(ls->m_AcceptCond, cg); if ((ls->m_Status != SRTS_LISTENING) || ls->m_pUDT->m_bBroken) { @@ -816,7 +813,7 @@ SRTSOCKET CUDTUnited::accept(const SRTSOCKET listen, sockaddr* addr, int* addrle } if (!accepted && (ls->m_Status == SRTS_LISTENING)) - pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock)); + axcond.wait(); if (ls->m_pQueuedSockets->empty()) m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false); @@ -905,7 +902,7 @@ int CUDTUnited::connect(const SRTSOCKET u, const sockaddr* name, int namelen, in { // InvertedGuard unlocks in the constructor, then locks in the // destructor, no matter if an exception has fired. - InvertedGuard l_unlocker( s->m_pUDT->m_bSynRecving ? &s->m_ControlLock : 0 ); + InvertedGuard l_unlocker (s->m_ControlLock, s->m_pUDT->m_bSynRecving); s->m_pUDT->startConnect(name, forced_isn); } catch (CUDTException& e) // Interceptor, just to change the state. @@ -972,10 +969,7 @@ int CUDTUnited::close(const SRTSOCKET u) } // broadcast all "accept" waiting - pthread_mutex_lock(&(s->m_AcceptLock)); - pthread_cond_broadcast(&(s->m_AcceptCond)); - pthread_mutex_unlock(&(s->m_AcceptLock)); - + CSync::lock_broadcast(s->m_AcceptCond, s->m_AcceptLock); } else { @@ -1557,17 +1551,17 @@ void CUDTUnited::checkBrokenSockets() // asynchronous close: if ((!j->second->m_pUDT->m_pSndBuffer) || (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize()) - || (j->second->m_pUDT->m_tsLingerExpiration <= srt::sync::steady_clock::now())) + || (j->second->m_pUDT->m_tsLingerExpiration <= steady_clock::now())) { j->second->m_pUDT->m_tsLingerExpiration = steady_clock::time_point(); j->second->m_pUDT->m_bClosing = true; - j->second->m_tsClosureTimeStamp = srt::sync::steady_clock::now(); + j->second->m_tsClosureTimeStamp = steady_clock::now(); } } // timeout 1 second to destroy a socket AND it has been removed from // RcvUList - if ((srt::sync::steady_clock::now() - j->second->m_tsClosureTimeStamp > seconds_from(1)) + if ((steady_clock::now() - j->second->m_tsClosureTimeStamp > seconds_from(1)) && ((!j->second->m_pUDT->m_pRNode) || !j->second->m_pUDT->m_pRNode->m_bOnList)) { @@ -1607,7 +1601,7 @@ void CUDTUnited::removeSocket(const SRTSOCKET u) { m_Sockets[*q]->m_pUDT->m_bBroken = true; m_Sockets[*q]->m_pUDT->close(); - m_Sockets[*q]->m_tsClosureTimeStamp = srt::sync::steady_clock::now(); + m_Sockets[*q]->m_tsClosureTimeStamp = steady_clock::now(); m_Sockets[*q]->m_Status = SRTS_CLOSED; m_ClosedSockets[*q] = m_Sockets[*q]; m_Sockets.erase(*q); @@ -1877,6 +1871,7 @@ void* CUDTUnited::garbageCollect(void* p) THREAD_STATE_INIT("SRT:GC"); CGuard gcguard(self->m_GCStopLock); + CSync gcsync(self->m_GCStopCond, gcguard); while (!self->m_bClosing) { @@ -1884,7 +1879,7 @@ void* CUDTUnited::garbageCollect(void* p) self->checkBrokenSockets(); HLOGC(mglog.Debug, log << "GC: sleep 1 s"); - SyncEvent::wait_for_monotonic(&self->m_GCStopCond, &self->m_GCStopLock, seconds_from(1)); + gcsync.wait_for_monotonic(seconds_from(1)); } // remove all sockets and multiplexers @@ -3156,54 +3151,62 @@ SRT_SOCKSTATUS getsockstate(SRTSOCKET u) void setloglevel(LogLevel::type ll) { - CGuard gg(srt_logger_config.mutex); + srt_logger_config.lock(); srt_logger_config.max_level = ll; + srt_logger_config.unlock(); } void addlogfa(LogFA fa) { - CGuard gg(srt_logger_config.mutex); + srt_logger_config.lock(); srt_logger_config.enabled_fa.set(fa, true); + srt_logger_config.unlock(); } void dellogfa(LogFA fa) { - CGuard gg(srt_logger_config.mutex); + srt_logger_config.lock(); srt_logger_config.enabled_fa.set(fa, false); + srt_logger_config.unlock(); } void resetlogfa(set fas) { - CGuard gg(srt_logger_config.mutex); + srt_logger_config.lock(); for (int i = 0; i <= SRT_LOGFA_LASTNONE; ++i) srt_logger_config.enabled_fa.set(i, fas.count(i)); + srt_logger_config.unlock(); } void resetlogfa(const int* fara, size_t fara_size) { - CGuard gg(srt_logger_config.mutex); + srt_logger_config.lock(); srt_logger_config.enabled_fa.reset(); for (const int* i = fara; i != fara + fara_size; ++i) srt_logger_config.enabled_fa.set(*i, true); + srt_logger_config.unlock(); } void setlogstream(std::ostream& stream) { - CGuard gg(srt_logger_config.mutex); + srt_logger_config.lock(); srt_logger_config.log_stream = &stream; + srt_logger_config.unlock(); } void setloghandler(void* opaque, SRT_LOG_HANDLER_FN* handler) { - CGuard gg(srt_logger_config.mutex); + srt_logger_config.lock(); srt_logger_config.loghandler_opaque = opaque; srt_logger_config.loghandler_fn = handler; + srt_logger_config.unlock(); } void setlogflags(int flags) { - CGuard gg(srt_logger_config.mutex); + srt_logger_config.lock(); srt_logger_config.flags = flags; + srt_logger_config.unlock(); } SRT_API bool setstreamid(SRTSOCKET u, const std::string& sid) diff --git a/srtcore/api.h b/srtcore/api.h index 948b76d42..4c223575d 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -97,14 +97,14 @@ class CUDTSocket std::set* m_pQueuedSockets; //< set of connections waiting for accept() std::set* m_pAcceptSockets; //< set of accept()ed connections - pthread_cond_t m_AcceptCond; //< used to block "accept" call - pthread_mutex_t m_AcceptLock; //< mutex associated to m_AcceptCond + srt::sync::CCondition m_AcceptCond; //< used to block "accept" call + srt::sync::CMutex m_AcceptLock; //< mutex associated to m_AcceptCond unsigned int m_uiBackLog; //< maximum number of connections in queue int m_iMuxID; //< multiplexer ID - pthread_mutex_t m_ControlLock; //< lock this socket exclusively for control APIs: bind/listen/connect + srt::sync::CMutex m_ControlLock; //< lock this socket exclusively for control APIs: bind/listen/connect static int64_t getPeerSpec(SRTSOCKET id, int32_t isn) { @@ -215,9 +215,9 @@ friend class CRendezvousQueue; private: std::map m_Sockets; // stores all the socket structures - pthread_mutex_t m_GlobControlLock; // used to synchronize UDT API + srt::sync::CMutex m_GlobControlLock; // used to synchronize UDT API - pthread_mutex_t m_IDLock; // used to synchronize ID generation + srt::sync::CMutex m_IDLock; // used to synchronize ID generation SRTSOCKET m_SocketIDGenerator; // seed to generate a new unique socket ID std::map > m_PeerRec;// record sockets from peers to avoid repeated connection request, int64_t = (socker_id << 30) + isn @@ -234,17 +234,17 @@ friend class CRendezvousQueue; private: std::map m_mMultiplexer; // UDP multiplexer - pthread_mutex_t m_MultiplexerLock; + srt::sync::CMutex m_MultiplexerLock; private: CCache* m_pCache; // UDT network information cache private: volatile bool m_bClosing; - pthread_mutex_t m_GCStopLock; - pthread_cond_t m_GCStopCond; + srt::sync::CMutex m_GCStopLock; + srt::sync::CCondition m_GCStopCond; - pthread_mutex_t m_InitLock; + srt::sync::CMutex m_InitLock; int m_iInstanceCount; // number of startup() called by application bool m_bGCStatus; // if the GC thread is working (true) diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index 774a16203..f79acee56 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -113,7 +113,7 @@ CSndBuffer::CSndBuffer(int size, int mss) m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock; - pthread_mutex_init(&m_BufLock, NULL); + createMutex(m_BufLock, "Buf"); } CSndBuffer::~CSndBuffer() @@ -135,7 +135,7 @@ CSndBuffer::~CSndBuffer() delete temp; } - pthread_mutex_destroy(&m_BufLock); + releaseMutex(m_BufLock); } void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order, uint64_t srctime, ref_t r_msgno) @@ -712,7 +712,7 @@ m_iNotch(0) memset(m_TsbPdDriftHisto1ms, 0, sizeof(m_TsbPdDriftHisto1ms)); #endif - pthread_mutex_init(&m_BytesCountLock, NULL); + createMutex(m_BytesCountLock, "BytesCount"); } CRcvBuffer::~CRcvBuffer() @@ -727,7 +727,7 @@ CRcvBuffer::~CRcvBuffer() delete [] m_pUnit; - pthread_mutex_destroy(&m_BytesCountLock); + releaseMutex(m_BytesCountLock); } void CRcvBuffer::countBytes(int pkts, int bytes, bool acked) @@ -1556,7 +1556,7 @@ void CRcvBuffer::printDriftOffset(int tsbPdOffset, int tsbPdDriftAvg) } #endif /* SRT_DEBUG_TSBPD_DRIFT */ -void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, pthread_mutex_t& mutex_to_lock) +void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, CMutex& mutex_to_lock) { if (!m_bTsbPdMode) // Not checked unless in TSBPD mode return; diff --git a/srtcore/buffer.h b/srtcore/buffer.h index 247d6fe65..33b711838 100644 --- a/srtcore/buffer.h +++ b/srtcore/buffer.h @@ -161,7 +161,7 @@ class CSndBuffer static const int INPUTRATE_INITIAL_BYTESPS = BW_INFINITE; private: - pthread_mutex_t m_BufLock; // used to synchronize buffer operation + srt::sync::CMutex m_BufLock; // used to synchronize buffer operation struct Block { @@ -376,7 +376,7 @@ class CRcvBuffer /// @param [in] timestamp packet time stamp /// @param [ref] lock Mutex that should be locked for the operation - void addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& lock); + void addRcvTsbPdDriftSample(uint32_t timestamp, srt::sync::CMutex& lock); #ifdef SRT_DEBUG_TSBPD_DRIFT void printDriftHistogram(int64_t iDrift); @@ -514,7 +514,7 @@ class CRcvBuffer // up to which data are already retrieved; // in message reading mode it's unused and always 0) - pthread_mutex_t m_BytesCountLock; // used to protect counters operations + srt::sync::CMutex m_BytesCountLock; // used to protect counters operations int m_iBytesCount; // Number of payload bytes in the buffer int m_iAckedPktsCount; // Number of acknowledged pkts in the buffer int m_iAckedBytesCount; // Number of acknowledged payload bytes in the buffer diff --git a/srtcore/cache.h b/srtcore/cache.h index 141fe26c6..8db220736 100644 --- a/srtcore/cache.h +++ b/srtcore/cache.h @@ -82,13 +82,13 @@ template class CCache m_iCurrSize(0) { m_vHashPtr.resize(m_iHashSize); - srt::sync::CGuard::createMutex(m_Lock); + srt::sync::createMutex(m_Lock, "Cache"); } ~CCache() { clear(); - srt::sync::CGuard::releaseMutex(m_Lock); + srt::sync::releaseMutex(m_Lock); } public: @@ -98,7 +98,7 @@ template class CCache int lookup(T* data) { - srt::sync::CGuard cacheguard(m_Lock); + srt::sync::CGuard cacheguard(m_Lock); int key = data->getKey(); if (key < 0) @@ -126,7 +126,7 @@ template class CCache int update(T* data) { - srt::sync::CGuard cacheguard(m_Lock); + srt::sync::CGuard cacheguard(m_Lock); int key = data->getKey(); if (key < 0) @@ -223,7 +223,7 @@ template class CCache int m_iHashSize; int m_iCurrSize; - pthread_mutex_t m_Lock; + srt::sync::CMutex m_Lock; private: CCache(const CCache&); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 645a61546..4d99db9cb 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -1055,7 +1055,7 @@ void CUDT::getOpt(SRT_SOCKOPT optName, void *optval, int &optlen) if (m_pRcvBuffer) { CGuard::enterCS(m_RecvLock); - *(int32_t *)optval = m_pRcvBuffer->getRcvDataSize(); + *(int32_t*)optval = m_pRcvBuffer->getRcvDataSize(); CGuard::leaveCS(m_RecvLock); } else @@ -5408,12 +5408,12 @@ bool CUDT::close() m_bConnected = false; } - if (m_bTsbPd && !pthread_equal(m_RcvTsbPdThread, pthread_t())) + if (m_bTsbPd && isthread(m_RcvTsbPdThread)) { HLOGC(mglog.Debug, log << "CLOSING, joining TSBPD thread..."); - void *retval; - int ret SRT_ATR_UNUSED = pthread_join(m_RcvTsbPdThread, &retval); - HLOGC(mglog.Debug, log << "... " << (ret == 0 ? "SUCCEEDED" : "FAILED")); + void* retval; + bool ret SRT_ATR_UNUSED = jointhread(m_RcvTsbPdThread, retval); + HLOGC(mglog.Debug, log << "... " << (ret ? "SUCCEEDED" : "FAILED")); } HLOGC(mglog.Debug, log << "CLOSING, joining send/receive threads"); @@ -6413,7 +6413,7 @@ void CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous) perf->mbpsBandwidth = Bps2Mbps(availbw * (m_iMaxSRTPayloadSize + pktHdrSize)); - if (pthread_mutex_trylock(&m_ConnectionLock) == 0) + if (CGuard::enterCS(m_ConnectionLock, false) == 0) { if (m_pSndBuffer) { @@ -6473,7 +6473,7 @@ void CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous) //< } - pthread_mutex_unlock(&m_ConnectionLock); + CGuard::leaveCS(m_ConnectionLock); } else { @@ -6638,36 +6638,36 @@ bool CUDT::updateCC(ETransmissionEvent evt, EventVariant arg) void CUDT::initSynch() { - pthread_mutex_init(&m_SendBlockLock, NULL); - pthread_cond_init(&m_SendBlockCond, NULL); - pthread_mutex_init(&m_RecvDataLock, NULL); - pthread_cond_init(&m_RecvDataCond, NULL); - pthread_mutex_init(&m_SendLock, NULL); - pthread_mutex_init(&m_RecvLock, NULL); - pthread_mutex_init(&m_RcvLossLock, NULL); - pthread_mutex_init(&m_RecvAckLock, NULL); - pthread_mutex_init(&m_RcvBufferLock, NULL); - pthread_mutex_init(&m_ConnectionLock, NULL); - pthread_mutex_init(&m_StatsLock, NULL); + createMutex(m_SendBlockLock, "SendBlock"); + createCond(m_SendBlockCond, "SendBlock"); + createMutex(m_RecvDataLock, "RecvData"); + createCond(m_RecvDataCond, "RecvData"); + createMutex(m_SendLock, "Send"); + createMutex(m_RecvLock, "Recv"); + createMutex(m_RcvLossLock, "RcvLoss"); + createMutex(m_RecvAckLock, "RecvAck"); + createMutex(m_RcvBufferLock, "RcvBuffer"); + createMutex(m_ConnectionLock, "Connection"); + createMutex(m_StatsLock, "Stats"); memset(&m_RcvTsbPdThread, 0, sizeof m_RcvTsbPdThread); - pthread_cond_init(&m_RcvTsbPdCond, NULL); + createCond(m_RcvTsbPdCond, "RcvTsbPd"); } void CUDT::destroySynch() { - pthread_mutex_destroy(&m_SendBlockLock); - pthread_cond_destroy(&m_SendBlockCond); - pthread_mutex_destroy(&m_RecvDataLock); - pthread_cond_destroy(&m_RecvDataCond); - pthread_mutex_destroy(&m_SendLock); - pthread_mutex_destroy(&m_RecvLock); - pthread_mutex_destroy(&m_RcvLossLock); - pthread_mutex_destroy(&m_RecvAckLock); - pthread_mutex_destroy(&m_RcvBufferLock); - pthread_mutex_destroy(&m_ConnectionLock); - pthread_mutex_destroy(&m_StatsLock); - pthread_cond_destroy(&m_RcvTsbPdCond); + releaseMutex(m_SendBlockLock); + releaseCond(m_SendBlockCond); + releaseMutex(m_RecvDataLock); + releaseCond(m_RecvDataCond); + releaseMutex(m_SendLock); + releaseMutex(m_RecvLock); + releaseMutex(m_RcvLossLock); + releaseMutex(m_RecvAckLock); + releaseMutex(m_RcvBufferLock); + releaseMutex(m_ConnectionLock); + releaseMutex(m_StatsLock); + releaseCond(m_RcvTsbPdCond); } void CUDT::releaseSynch() @@ -6675,23 +6675,22 @@ void CUDT::releaseSynch() // wake up user calls CSync::lock_signal(m_SendBlockCond, m_SendBlockLock); - pthread_mutex_lock(&m_SendLock); - pthread_mutex_unlock(&m_SendLock); + CGuard::enterCS(m_SendLock); + CGuard::leaveCS(m_SendLock); CSync::lock_signal(m_RecvDataCond, m_RecvDataLock); CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock); - pthread_mutex_lock(&m_RecvDataLock); - if (!pthread_equal(m_RcvTsbPdThread, pthread_t())) + CGuard::enterCS(m_RecvDataLock); + if (isthread(m_RcvTsbPdThread)) { - pthread_join(m_RcvTsbPdThread, NULL); - m_RcvTsbPdThread = pthread_t(); + jointhread(m_RcvTsbPdThread); } - pthread_mutex_unlock(&m_RecvDataLock); + CGuard::leaveCS(m_RecvDataLock); - pthread_mutex_lock(&m_RecvLock); - pthread_mutex_unlock(&m_RecvLock); + CGuard::enterCS(m_RecvLock); + CGuard::leaveCS(m_RecvLock); } #if ENABLE_HEAVY_LOGGING @@ -7323,83 +7322,80 @@ void CUDT::processCtrl(CPacket &ctrlpkt) bool secure = true; - // protect packet retransmission - CGuard::enterCS(m_RecvAckLock); - // This variable is used in "normal" logs, so it may cause a warning // when logging is forcefully off. int32_t wrong_loss SRT_ATR_UNUSED = CSeqNo::m_iMaxSeqNo; - // decode loss list message and insert loss into the sender loss list - for (int i = 0, n = (int)(ctrlpkt.getLength() / 4); i < n; ++i) + // protect packet retransmission { - if (IsSet(losslist[i], LOSSDATA_SEQNO_RANGE_FIRST)) - { - // Then it's this is a specification with HI in a consecutive cell. - int32_t losslist_lo = SEQNO_VALUE::unwrap(losslist[i]); - int32_t losslist_hi = losslist[i + 1]; - // specification means that the consecutive cell has been already interpreted. - ++i; + CGuard ack_lock (m_RecvAckLock); - HLOGF(mglog.Debug, - "received UMSG_LOSSREPORT: %d-%d (%d packets)...", - losslist_lo, - losslist_hi, - CSeqNo::seqoff(losslist_lo, losslist_hi) + 1); - - if ((CSeqNo::seqcmp(losslist_lo, losslist_hi) > 0) || - (CSeqNo::seqcmp(losslist_hi, m_iSndCurrSeqNo) > 0)) + // decode loss list message and insert loss into the sender loss list + for (int i = 0, n = (int)(ctrlpkt.getLength() / 4); i < n; ++i) + { + if (IsSet(losslist[i], LOSSDATA_SEQNO_RANGE_FIRST)) { - // seq_a must not be greater than seq_b; seq_b must not be greater than the most recent sent seq - secure = false; - wrong_loss = losslist_hi; - // XXX leaveCS: really necessary? 'break' will break the 'for' loop, not the 'switch' statement. - // and the leaveCS is done again next to the 'for' loop end. - CGuard::leaveCS(m_RecvAckLock); - break; - } + // Then it's this is a specification with HI in a consecutive cell. + int32_t losslist_lo = SEQNO_VALUE::unwrap(losslist[i]); + int32_t losslist_hi = losslist[i + 1]; + // specification means that the consecutive cell has been already interpreted. + ++i; - int num = 0; - if (CSeqNo::seqcmp(losslist_lo, m_iSndLastAck) >= 0) - num = m_pSndLossList->insert(losslist_lo, losslist_hi); - else if (CSeqNo::seqcmp(losslist_hi, m_iSndLastAck) >= 0) - { - // This should be theoretically impossible because this would mean - // that the received packet loss report informs about the loss that predates - // the ACK sequence. - // However, this can happen if the packet reordering has caused the earlier sent - // LOSSREPORT will be delivered after later sent ACK. Whatever, ACK should be - // more important, so simply drop the part that predates ACK. - num = m_pSndLossList->insert(m_iSndLastAck, losslist_hi); - } + HLOGF(mglog.Debug, + "received UMSG_LOSSREPORT: %d-%d (%d packets)...", + losslist_lo, + losslist_hi, + CSeqNo::seqoff(losslist_lo, losslist_hi) + 1); - CGuard::enterCS(m_StatsLock); - m_stats.traceSndLoss += num; - m_stats.sndLossTotal += num; - CGuard::leaveCS(m_StatsLock); - } - else if (CSeqNo::seqcmp(losslist[i], m_iSndLastAck) >= 0) - { - HLOGF(mglog.Debug, "received UMSG_LOSSREPORT: %d (1 packet)...", losslist[i]); + if ((CSeqNo::seqcmp(losslist_lo, losslist_hi) > 0) || + (CSeqNo::seqcmp(losslist_hi, m_iSndCurrSeqNo) > 0)) + { + // seq_a must not be greater than seq_b; seq_b must not be greater than the most recent sent seq + secure = false; + wrong_loss = losslist_hi; + break; + } - if (CSeqNo::seqcmp(losslist[i], m_iSndCurrSeqNo) > 0) - { - // seq_a must not be greater than the most recent sent seq - secure = false; - wrong_loss = losslist[i]; - CGuard::leaveCS(m_RecvAckLock); - break; + int num = 0; + if (CSeqNo::seqcmp(losslist_lo, m_iSndLastAck) >= 0) + num = m_pSndLossList->insert(losslist_lo, losslist_hi); + else if (CSeqNo::seqcmp(losslist_hi, m_iSndLastAck) >= 0) + { + // This should be theoretically impossible because this would mean + // that the received packet loss report informs about the loss that predates + // the ACK sequence. + // However, this can happen if the packet reordering has caused the earlier sent + // LOSSREPORT will be delivered after later sent ACK. Whatever, ACK should be + // more important, so simply drop the part that predates ACK. + num = m_pSndLossList->insert(m_iSndLastAck, losslist_hi); + } + + CGuard::enterCS(m_StatsLock); + m_stats.traceSndLoss += num; + m_stats.sndLossTotal += num; + CGuard::leaveCS(m_StatsLock); } + else if (CSeqNo::seqcmp(losslist[i], m_iSndLastAck) >= 0) + { + HLOGF(mglog.Debug, "received UMSG_LOSSREPORT: %d (1 packet)...", losslist[i]); - int num = m_pSndLossList->insert(losslist[i], losslist[i]); + if (CSeqNo::seqcmp(losslist[i], m_iSndCurrSeqNo) > 0) + { + // seq_a must not be greater than the most recent sent seq + secure = false; + wrong_loss = losslist[i]; + break; + } - CGuard::enterCS(m_StatsLock); - m_stats.traceSndLoss += num; - m_stats.sndLossTotal += num; - CGuard::leaveCS(m_StatsLock); + int num = m_pSndLossList->insert(losslist[i], losslist[i]); + + CGuard::enterCS(m_StatsLock); + m_stats.traceSndLoss += num; + m_stats.sndLossTotal += num; + CGuard::leaveCS(m_StatsLock); + } } } - CGuard::leaveCS(m_RecvAckLock); updateCC(TEV_LOSSREPORT, EventVariant(losslist, losslist_len)); @@ -8046,7 +8042,7 @@ int CUDT::processData(CUnit *in_unit) m_tsLastRspTime = steady_clock::now(); // We are receiving data, start tsbpd thread if TsbPd is enabled - if (m_bTsbPd && pthread_equal(m_RcvTsbPdThread, pthread_t())) + if (m_bTsbPd && !isthread(m_RcvTsbPdThread)) { HLOGP(mglog.Debug, "Spawning TSBPD thread"); int st = 0; diff --git a/srtcore/core.h b/srtcore/core.h index 8fb6ccfc7..9a148f29f 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -692,7 +692,7 @@ class CUDT bool m_bTsbPd; // Peer sends TimeStamp-Based Packet Delivery Packets pthread_t m_RcvTsbPdThread; // Rcv TsbPD Thread handle - pthread_cond_t m_RcvTsbPdCond; + srt::sync::CCondition m_RcvTsbPdCond; bool m_bTsbPdAckWakeup; // Signal TsbPd thread on Ack sent CallbackHolder m_cbAcceptHook; @@ -711,26 +711,26 @@ class CUDT private: // synchronization: mutexes and conditions - pthread_mutex_t m_ConnectionLock; // used to synchronize connection operation + srt::sync::CMutex m_ConnectionLock; // used to synchronize connection operation - pthread_cond_t m_SendBlockCond; // used to block "send" call - pthread_mutex_t m_SendBlockLock; // lock associated to m_SendBlockCond + srt::sync::CCondition m_SendBlockCond; // used to block "send" call + srt::sync::CMutex m_SendBlockLock; // lock associated to m_SendBlockCond - pthread_mutex_t m_RcvBufferLock; // Protects the state of the m_pRcvBuffer + srt::sync::CMutex m_RcvBufferLock; // Protects the state of the m_pRcvBuffer // Protects access to m_iSndCurrSeqNo, m_iSndLastAck - pthread_mutex_t m_RecvAckLock; // Protects the state changes while processing incomming ACK (UDT_EPOLL_OUT) + srt::sync::CMutex m_RecvAckLock; // Protects the state changes while processing incomming ACK (UDT_EPOLL_OUT) - pthread_cond_t m_RecvDataCond; // used to block "recv" when there is no data - pthread_mutex_t m_RecvDataLock; // lock associated to m_RecvDataCond + srt::sync::CCondition m_RecvDataCond; // used to block "recv" when there is no data + srt::sync::CMutex m_RecvDataLock; // lock associated to m_RecvDataCond - pthread_mutex_t m_SendLock; // used to synchronize "send" call - pthread_mutex_t m_RecvLock; // used to synchronize "recv" call + srt::sync::CMutex m_SendLock; // used to synchronize "send" call + srt::sync::CMutex m_RecvLock; // used to synchronize "recv" call - pthread_mutex_t m_RcvLossLock; // Protects the receiver loss list (access: CRcvQueue::worker, CUDT::tsbpd) + srt::sync::CMutex m_RcvLossLock; // Protects the receiver loss list (access: CRcvQueue::worker, CUDT::tsbpd) - pthread_mutex_t m_StatsLock; // used to synchronize access to trace statistics + srt::sync::CMutex m_StatsLock; // used to synchronize access to trace statistics void initSynch(); void destroySynch(); diff --git a/srtcore/epoll.cpp b/srtcore/epoll.cpp index c39375958..0ff347fda 100644 --- a/srtcore/epoll.cpp +++ b/srtcore/epoll.cpp @@ -80,12 +80,12 @@ using namespace srt_logging; CEPoll::CEPoll(): m_iIDSeed(0) { - CGuard::createMutex(m_EPollLock); + createMutex(m_EPollLock, "EPoll"); } CEPoll::~CEPoll() { - CGuard::releaseMutex(m_EPollLock); + releaseMutex(m_EPollLock); } int CEPoll::create() diff --git a/srtcore/epoll.h b/srtcore/epoll.h index 1d0463ffc..83f328061 100755 --- a/srtcore/epoll.h +++ b/srtcore/epoll.h @@ -385,10 +385,10 @@ friend class CRendezvousQueue; private: int m_iIDSeed; // seed to generate a new ID - pthread_mutex_t m_SeedLock; + srt::sync::CMutex m_SeedLock; std::map m_mPolls; // all epolls - pthread_mutex_t m_EPollLock; + srt::sync::CMutex m_EPollLock; }; diff --git a/srtcore/list.cpp b/srtcore/list.cpp index b0faf6836..2ac15fecf 100644 --- a/srtcore/list.cpp +++ b/srtcore/list.cpp @@ -77,13 +77,13 @@ m_ListLock() } // sender list needs mutex protection - pthread_mutex_init(&m_ListLock, 0); + createMutex(m_ListLock, "LossList"); } CSndLossList::~CSndLossList() { delete [] m_caSeq; - pthread_mutex_destroy(&m_ListLock); + releaseMutex(m_ListLock); } int CSndLossList::insert(int32_t seqno1, int32_t seqno2) diff --git a/srtcore/list.h b/srtcore/list.h index a1052a532..5f282203b 100644 --- a/srtcore/list.h +++ b/srtcore/list.h @@ -99,7 +99,7 @@ class CSndLossList int m_iSize; // size of the static array int m_iLastInsertPos; // position of last insert node - mutable pthread_mutex_t m_ListLock; // used to synchronize list operation + mutable srt::sync::CMutex m_ListLock; // used to synchronize list operation private: CSndLossList(const CSndLossList&); diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 69536aae7..0ef514efe 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -261,13 +261,13 @@ CSndUList::CSndUList() , m_pTimer(NULL) { m_pHeap = new CSNode *[m_iArrayLength]; - pthread_mutex_init(&m_ListLock, NULL); + createMutex(m_ListLock, "List"); } CSndUList::~CSndUList() { delete[] m_pHeap; - pthread_mutex_destroy(&m_ListLock); + releaseMutex(m_ListLock); } void CSndUList::update(const CUDT *u, EReschedule reschedule) @@ -471,8 +471,8 @@ CSndQueue::CSndQueue() , m_WindowCond() , m_bClosing(false) { - pthread_cond_init(&m_WindowCond, NULL); - pthread_mutex_init(&m_WindowLock, NULL); + createCond(m_WindowCond, "Window"); + createMutex(m_WindowLock, "Window"); } CSndQueue::~CSndQueue() @@ -486,10 +486,11 @@ CSndQueue::~CSndQueue() CSync::lock_signal(m_WindowCond, m_WindowLock); - if (!pthread_equal(m_WorkerThread, pthread_t())) - pthread_join(m_WorkerThread, NULL); - pthread_cond_destroy(&m_WindowCond); - pthread_mutex_destroy(&m_WindowLock); + if (isthread(m_WorkerThread)) + jointhread(m_WorkerThread); + + releaseCond(m_WindowCond); + releaseMutex(m_WindowLock); delete m_pSndUList; } @@ -808,12 +809,12 @@ CRendezvousQueue::CRendezvousQueue() : m_lRendezvousID() , m_RIDVectorLock() { - pthread_mutex_init(&m_RIDVectorLock, NULL); + createMutex(m_RIDVectorLock, "RIDVector"); } CRendezvousQueue::~CRendezvousQueue() { - pthread_mutex_destroy(&m_RIDVectorLock); + releaseMutex(m_RIDVectorLock); for (list::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i) { @@ -1044,21 +1045,21 @@ CRcvQueue::CRcvQueue() , m_PassLock() , m_PassCond() { - pthread_mutex_init(&m_PassLock, NULL); - pthread_cond_init(&m_PassCond, NULL); - pthread_mutex_init(&m_LSLock, NULL); - pthread_mutex_init(&m_IDLock, NULL); + createMutex(m_PassLock, "Pass"); + createCond(m_PassCond, "Pass"); + createMutex(m_LSLock, "LS"); + createMutex(m_IDLock, "ID"); } CRcvQueue::~CRcvQueue() { m_bClosing = true; - if (!pthread_equal(m_WorkerThread, pthread_t())) - pthread_join(m_WorkerThread, NULL); - pthread_mutex_destroy(&m_PassLock); - pthread_cond_destroy(&m_PassCond); - pthread_mutex_destroy(&m_LSLock); - pthread_mutex_destroy(&m_IDLock); + if (isthread(m_WorkerThread)) + jointhread(m_WorkerThread); + releaseMutex(m_PassLock); + releaseCond(m_PassCond); + releaseMutex(m_LSLock); + releaseMutex(m_IDLock); delete m_pRcvUList; delete m_pHash; diff --git a/srtcore/queue.h b/srtcore/queue.h index 41da624b6..5c58fcd7f 100644 --- a/srtcore/queue.h +++ b/srtcore/queue.h @@ -215,10 +215,10 @@ friend class CSndQueue; int m_iArrayLength; // physical length of the array int m_iLastEntry; // position of last entry on the heap array - pthread_mutex_t m_ListLock; + srt::sync::CMutex m_ListLock; - pthread_mutex_t* m_pWindowLock; - pthread_cond_t* m_pWindowCond; + srt::sync::CMutex* m_pWindowLock; + srt::sync::CCondition* m_pWindowCond; CTimer* m_pTimer; @@ -348,7 +348,7 @@ class CRendezvousQueue }; std::list m_lRendezvousID; // The sockets currently in rendezvous mode - pthread_mutex_t m_RIDVectorLock; + srt::sync::CMutex m_RIDVectorLock; }; class CSndQueue @@ -412,8 +412,8 @@ friend class CUDTUnited; CChannel* m_pChannel; // The UDP channel for data sending CTimer* m_pTimer; // Timing facility - pthread_mutex_t m_WindowLock; - pthread_cond_t m_WindowCond; + srt::sync::CMutex m_WindowLock; + srt::sync::CCondition m_WindowCond; volatile bool m_bClosing; // closing the worker @@ -509,16 +509,16 @@ friend class CUDTUnited; void storePkt(int32_t id, CPacket* pkt); private: - pthread_mutex_t m_LSLock; + srt::sync::CMutex m_LSLock; CUDT* m_pListener; // pointer to the (unique, if any) listening UDT entity CRendezvousQueue* m_pRendezvousQueue; // The list of sockets in rendezvous mode std::vector m_vNewEntry; // newly added entries, to be inserted - pthread_mutex_t m_IDLock; + srt::sync::CMutex m_IDLock; std::map > m_mBuffer; // temporary buffer for rendezvous connection request - pthread_mutex_t m_PassLock; - pthread_cond_t m_PassCond; + srt::sync::CMutex m_PassLock; + srt::sync::CCondition m_PassCond; private: CRcvQueue(const CRcvQueue&); diff --git a/srtcore/sync.cpp b/srtcore/sync.cpp index 52b700fb5..102067e97 100644 --- a/srtcore/sync.cpp +++ b/srtcore/sync.cpp @@ -100,7 +100,7 @@ const int64_t s_cpu_frequency = get_cpu_frequency(); // Automatically lock in constructor -CGuard::CGuard(pthread_mutex_t& lock, bool shouldwork): +CGuard::CGuard(pthread_mutex_t& lock, explicit_t shouldwork): m_Mutex(lock), m_iLocked(-1) { @@ -115,51 +115,86 @@ CGuard::~CGuard() pthread_mutex_unlock(&m_Mutex); } -// After calling this on a scoped lock wrapper (CGuard), -// the mutex will be unlocked right now, and no longer -// in destructor -void CGuard::forceUnlock() +int CGuard::enterCS(CMutex& lock, explicit_t block) { - if (m_iLocked == 0) + int retval; + if (block) { - pthread_mutex_unlock(&m_Mutex); - m_iLocked = -1; + retval = pthread_mutex_lock(RawAddr(lock)); } + else + { + retval = pthread_mutex_trylock(RawAddr(lock)); + } + return retval; +} + +int CGuard::leaveCS(CMutex& lock) +{ + return pthread_mutex_unlock(RawAddr(lock)); +} + +/// This function checks if the given thread id +/// is a thread id, stating that a thread id variable +/// that doesn't hold a running thread, is equal to +/// a null thread (pthread_t()). +bool isthread(const pthread_t& thr) +{ + return pthread_equal(thr, pthread_t()) == 0; // NOT equal to a null thread } -int CGuard::enterCS(pthread_mutex_t& lock) +bool jointhread(pthread_t& thr) { - return pthread_mutex_lock(&lock); + int ret = pthread_join(thr, NULL); + thr = pthread_t(); // prevent dangling + return ret == 0; } -int CGuard::leaveCS(pthread_mutex_t& lock) +bool jointhread(pthread_t& thr, void*& result) { - return pthread_mutex_unlock(&lock); + int ret = pthread_join(thr, &result); + thr = pthread_t(); + return ret == 0; } -void CGuard::createMutex(pthread_mutex_t& lock) +void createMutex(CMutex& lock, const char* name SRT_ATR_UNUSED) { - pthread_mutex_init(&lock, NULL); + pthread_mutexattr_t* pattr = NULL; + pthread_mutex_init(RawAddr(lock), pattr); } -void CGuard::releaseMutex(pthread_mutex_t& lock) +void releaseMutex(CMutex& lock) { - pthread_mutex_destroy(&lock); + pthread_mutex_destroy(RawAddr(lock)); } -void CGuard::createCond(pthread_cond_t& cond) +void createCond(CCondition& cond, const char* name SRT_ATR_UNUSED) { - pthread_cond_init(&cond, NULL); + + pthread_condattr_t* pattr = NULL; + pthread_cond_init(RawAddr(cond), pattr); } -void CGuard::releaseCond(pthread_cond_t& cond) +void createCond_monotonic(CCondition& cond, const char* name SRT_ATR_UNUSED) { - pthread_cond_destroy(&cond); + + pthread_condattr_t* pattr = NULL; +#if ENABLE_MONOTONIC_CLOCK + pthread_condattr_t CondAttribs; + pthread_condattr_init(&CondAttribs); + pthread_condattr_setclock(&CondAttribs, CLOCK_MONOTONIC); + pattr = &CondAttribs; +#endif + pthread_cond_init(RawAddr(cond), pattr); } +void releaseCond(CCondition& cond) +{ + pthread_cond_destroy(RawAddr(cond)); +} -CSync::CSync(pthread_cond_t& cond, CGuard& g) +CSync::CSync(CCondition& cond, CGuard& g) : m_cond(&cond), m_mutex(&g.m_Mutex) { // XXX it would be nice to check whether the owner is also current thread @@ -170,7 +205,7 @@ CSync::CSync(pthread_cond_t& cond, CGuard& g) // variable that you have used for construction as its argument. } -CSync::CSync(pthread_cond_t& cond, pthread_mutex_t& mutex, Nolock) +CSync::CSync(CCondition& cond, CMutex& mutex, Nolock) : m_cond(&cond) , m_mutex(&mutex) { @@ -184,7 +219,7 @@ CSync::CSync(pthread_cond_t& cond, pthread_mutex_t& mutex, Nolock) void CSync::wait() { THREAD_PAUSED(); - pthread_cond_wait(&(*m_cond), &(*m_mutex)); + pthread_cond_wait(RawAddr(*m_cond), RawAddr(*m_mutex)); THREAD_RESUMED(); } @@ -198,7 +233,7 @@ bool CSync::wait_until(const steady_clock::time_point& exptime) return false; // timeout THREAD_PAUSED(); - bool signaled = SyncEvent::wait_for(m_cond, m_mutex, exptime - now) != ETIMEDOUT; + bool signaled = CondWaitFor(m_cond, m_mutex, exptime - now) != ETIMEDOUT; THREAD_RESUMED(); return signaled; @@ -217,40 +252,60 @@ bool CSync::wait_for(const steady_clock::duration& delay) /// time waiting, you may want to implement it better way. THREAD_PAUSED(); - bool signaled = SyncEvent::wait_for(m_cond, m_mutex, delay) != ETIMEDOUT; + bool signaled = CondWaitFor(m_cond, m_mutex, delay) != ETIMEDOUT; THREAD_RESUMED(); return signaled; } +/// Block the call until either @a timestamp time achieved +/// or the conditional is signaled. +/// @param [in] delay Maximum time to wait since the moment of the call +/// @retval true Resumed due to getting a CV signal +/// @retval false Resumed due to being past @a timestamp +bool CSync::wait_for_monotonic(const steady_clock::duration& delay) +{ + // Note: this is implemented this way because the pthread API + // does not provide a possibility to wait relative time. When + // you implement it for different API that does provide relative + /// time waiting, you may want to implement it better way. + + THREAD_PAUSED(); + bool signaled = CondWaitFor_monotonic(m_cond, m_mutex, delay) != ETIMEDOUT; + THREAD_RESUMED(); + + return signaled; +} + + void CSync::lock_signal() { // We expect m_nolock == true. lock_signal(*m_cond, *m_mutex); } -void CSync::lock_signal(pthread_cond_t& cond, pthread_mutex_t& mutex) +void CSync::lock_signal(CCondition& cond, CMutex& mutex) { // Not using CGuard here because it would be logged // and this will result in unnecessary excessive logging. - pthread_mutex_lock(&(mutex)); - pthread_cond_signal(&(cond)); - pthread_mutex_unlock(&(mutex)); + pthread_mutex_lock(RawAddr(mutex)); + pthread_cond_signal(RawAddr(cond)); + pthread_mutex_unlock(RawAddr(mutex)); } -void CSync::lock_broadcast(pthread_cond_t& cond, pthread_mutex_t& mutex) +void CSync::lock_broadcast(CCondition& cond, CMutex& mutex) { // Not using CGuard here because it would be logged // and this will result in unnecessary excessive logging. - pthread_mutex_lock(&(mutex)); - pthread_cond_broadcast(&(cond)); - pthread_mutex_unlock(&(mutex)); + pthread_mutex_lock(RawAddr(mutex)); + pthread_cond_broadcast(RawAddr(cond)); + pthread_mutex_unlock(RawAddr(mutex)); } void CSync::signal_locked(CGuard& lk SRT_ATR_UNUSED) { // We expect m_nolock == false. - pthread_cond_signal(&(*m_cond)); + pthread_cond_signal(RawAddr(*m_cond)); } void CSync::signal_relaxed() @@ -258,19 +313,20 @@ void CSync::signal_relaxed() signal_relaxed(*m_cond); } -void CSync::signal_relaxed(pthread_cond_t& cond) +void CSync::signal_relaxed(CCondition& cond) { - pthread_cond_signal(&(cond)); + pthread_cond_signal(RawAddr(cond)); } -void CSync::broadcast_relaxed(pthread_cond_t& cond) +void CSync::broadcast_relaxed(CCondition& cond) { - pthread_cond_broadcast(&(cond)); + pthread_cond_broadcast(RawAddr(cond)); } } // namespace sync } // namespace srt + template <> uint64_t srt::sync::TimePoint::us_since_epoch() const { @@ -367,7 +423,7 @@ std::string srt::sync::FormatTimeSys(const steady_clock::time_point& timestamp) return out.str(); } -int srt::sync::SyncEvent::wait_for(pthread_cond_t* cond, pthread_mutex_t* mutex, const Duration& rel_time) +int srt::sync::CondWaitFor(pthread_cond_t* cond, pthread_mutex_t* mutex, const Duration& rel_time) { timespec timeout; timeval now; @@ -379,7 +435,7 @@ int srt::sync::SyncEvent::wait_for(pthread_cond_t* cond, pthread_mutex_t* mutex, } #if ENABLE_MONOTONIC_CLOCK -int srt::sync::SyncEvent::wait_for_monotonic(pthread_cond_t* cond, pthread_mutex_t* mutex, const Duration& rel_time) +int srt::sync::CondWaitFor_monotonic(pthread_cond_t* cond, pthread_mutex_t* mutex, const Duration& rel_time) { timespec timeout; clock_gettime(CLOCK_MONOTONIC, &timeout); @@ -389,8 +445,8 @@ int srt::sync::SyncEvent::wait_for_monotonic(pthread_cond_t* cond, pthread_mutex return pthread_cond_timedwait(cond, mutex, &timeout); } #else -int srt::sync::SyncEvent::wait_for_monotonic(pthread_cond_t* cond, pthread_mutex_t* mutex, const Duration& rel_time) +int srt::sync::CondWaitFor_monotonic(pthread_cond_t* cond, pthread_mutex_t* mutex, const Duration& rel_time) { - return wait_for(cond, mutex, rel_time); + return CondWaitFor(cond, mutex, rel_time); } #endif diff --git a/srtcore/sync.h b/srtcore/sync.h index 9351a29aa..24041b7cd 100644 --- a/srtcore/sync.h +++ b/srtcore/sync.h @@ -186,20 +186,17 @@ timespec us_to_timespec(const uint64_t time_us); class SyncEvent { public: - /// Atomically releases lock, blocks the current executing thread, - /// and adds it to the list of threads waiting on* this. - /// The thread will be unblocked when notify_all() or notify_one() is executed, - /// or when the relative timeout rel_time expires. - /// It may also be unblocked spuriously. - /// When unblocked, regardless of the reason, lock is reacquiredand wait_for() exits. - /// - /// @return result of pthread_cond_wait(...) function call - /// - static int wait_for(pthread_cond_t* cond, pthread_mutex_t* mutex, const steady_clock::duration& rel_time); - - static int wait_for_monotonic(pthread_cond_t* cond, pthread_mutex_t* mutex, const steady_clock::duration& rel_time); }; +typedef ::pthread_mutex_t CMutex; +typedef ::pthread_cond_t CCondition; + +// Note: This cannot be defined as overloaded for +// two different types because on some platforms +// the pthread_cond_t and pthread_mutex_t are distinct +// types, while on others they resolve to the same type. +template +inline SysObj* RawAddr(SysObj& m) { return &m; } class CGuard { @@ -208,41 +205,89 @@ class CGuard /// the scope where this object exists. /// @param lock Mutex to lock /// @param if_condition If this is false, CGuard will do completely nothing - CGuard(pthread_mutex_t& lock, bool if_condition = true); + CGuard(CMutex& lock, explicit_t if_condition = true); ~CGuard(); public: - static int enterCS(pthread_mutex_t& lock); - static int leaveCS(pthread_mutex_t& lock); - - static void createMutex(pthread_mutex_t& lock); - static void releaseMutex(pthread_mutex_t& lock); - static void createCond(pthread_cond_t& cond); - static void releaseCond(pthread_cond_t& cond); - - void forceUnlock(); + // The force-Lock/Unlock mechanism can be used to forcefully + // change the lock on the CGuard object. This is in order to + // temporarily change the lock status on the given mutex, but + // still do the right job in the destructor. For example, if + // a lock has been forcefully unlocked by forceUnlock, then + // the CGuard object will not try to unlock it in the destructor, + // but again, if the forceLock() was done again, the destructor + // will still unlock the mutex. + void forceLock() + { + if (m_iLocked == 0) + return; + Lock(); + } + + // After calling this on a scoped lock wrapper (CGuard), + // the mutex will be unlocked right now, and no longer + // in destructor + void forceUnlock() + { + if (m_iLocked == 0) + { + m_iLocked = -1; + Unlock(); + } + } + + static int enterCS(CMutex& lock, explicit_t block = true); + static int leaveCS(CMutex& lock); + + // This is for a special case when one class keeps a pointer + // to another mutex/cond in another object. Required because + // the operator& has been defined to return the internal pointer + // so that the most used syntax matches directly the raw mutex/cond types. private: friend class CSync; - pthread_mutex_t& m_Mutex; // Alias name of the mutex to be protected + void Lock() + { + m_iLocked = pthread_mutex_lock(RawAddr(m_Mutex)); + } + + void Unlock() + { + pthread_mutex_unlock(RawAddr(m_Mutex)); + } + + CMutex& m_Mutex; // Alias name of the mutex to be protected int m_iLocked; // Locking status CGuard& operator=(const CGuard&); }; +bool isthread(const pthread_t& thrval); + +bool jointhread(pthread_t& thr, void*& result); +bool jointhread(pthread_t& thr); + +void createMutex(CMutex& lock, const char* name); +void releaseMutex(CMutex& lock); + +void createCond(CCondition& cond, const char* name); +void createCond_monotonic(CCondition& cond, const char* name); +void releaseCond(CCondition& cond); + + class InvertedGuard { - pthread_mutex_t* m_pMutex; + CMutex* m_pMutex; public: - InvertedGuard(pthread_mutex_t* smutex): m_pMutex(smutex) + InvertedGuard(CMutex& smutex, bool shouldlock = true): m_pMutex() { - if ( !smutex ) + if ( !shouldlock) return; - - CGuard::leaveCS(*smutex); + m_pMutex = AddressOf(smutex); + CGuard::leaveCS(smutex); } ~InvertedGuard() @@ -256,12 +301,25 @@ class InvertedGuard //////////////////////////////////////////////////////////////////////////////// +/// Atomically releases lock, blocks the current executing thread, +/// and adds it to the list of threads waiting on* this. +/// The thread will be unblocked when notify_all() or notify_one() is executed, +/// or when the relative timeout rel_time expires. +/// It may also be unblocked spuriously. +/// When unblocked, regardless of the reason, lock is reacquiredand wait_for() exits. +/// +/// @return result of pthread_cond_wait(...) function call +/// +int CondWaitFor(pthread_cond_t* cond, pthread_mutex_t* mutex, const steady_clock::duration& rel_time); +int CondWaitFor_monotonic(pthread_cond_t* cond, pthread_mutex_t* mutex, const steady_clock::duration& rel_time); + // This class is used for condition variable combined with mutex by different ways. // This should provide a cleaner API around locking with debug-logging inside. class CSync { - pthread_cond_t* m_cond; - pthread_mutex_t* m_mutex; + CCondition* m_cond; + CMutex* m_mutex; + public: enum Nolock { NOLOCK }; @@ -270,14 +328,14 @@ class CSync // which has locked the mutex. On this delegate you should call only // signal_locked() and pass the CGuard variable that should remain locked. // Also wait() and wait_for() can be used only with this socket. - CSync(pthread_cond_t& cond, CGuard& g); + CSync(CCondition& cond, CGuard& g); // This is only for one-shot signaling. This doesn't need a CGuard // variable, only the mutex itself. Only lock_signal() can be used. - CSync(pthread_cond_t& cond, pthread_mutex_t& mutex, Nolock); + CSync(CCondition& cond, CMutex& mutex, Nolock); // An alternative method - static CSync nolock(pthread_cond_t& cond, pthread_mutex_t& m) + static CSync nolock(CCondition& cond, CMutex& m) { return CSync(cond, m, NOLOCK); } @@ -290,6 +348,7 @@ class CSync // Wait only for a given time delay (in microseconds). This function // extracts first current time using steady_clock::now(). bool wait_for(const steady_clock::duration& delay); + bool wait_for_monotonic(const steady_clock::duration& delay); // Wait until the given time is achieved. This actually // refers to wait_for for the time remaining to achieve @@ -302,13 +361,13 @@ class CSync void lock_signal(); // Static ad-hoc version - static void lock_signal(pthread_cond_t& cond, pthread_mutex_t& m); - static void lock_broadcast(pthread_cond_t& cond, pthread_mutex_t& m); + static void lock_signal(CCondition& cond, CMutex& m); + static void lock_broadcast(CCondition& cond, CMutex& m); void signal_locked(CGuard& lk); void signal_relaxed(); - static void signal_relaxed(pthread_cond_t& cond); - static void broadcast_relaxed(pthread_cond_t& cond); + static void signal_relaxed(CCondition& cond); + static void broadcast_relaxed(CCondition& cond); }; diff --git a/srtcore/utilities.h b/srtcore/utilities.h index 245d8ffc8..020ec021b 100755 --- a/srtcore/utilities.h +++ b/srtcore/utilities.h @@ -451,6 +451,22 @@ inline bool IsSet(int32_t bitset, int32_t flagset) return (bitset & flagset) == flagset; } +// std::addressof in C++11, +// needs to be provided for C++03 +template +inline RefType* AddressOf(RefType& r) +{ + return (RefType*)(&(unsigned char&)(r)); +} + +template +struct explicit_t +{ + T inobject; + explicit_t(const T& uo): inobject(uo) {} + operator T() const { return inobject; } +}; + // Homecooked version of ref_t. It's a copy of std::reference_wrapper // voided of unwanted properties and renamed to ref_t. diff --git a/srtcore/window.h b/srtcore/window.h index a1c2b71ec..f255c6bfd 100644 --- a/srtcore/window.h +++ b/srtcore/window.h @@ -148,15 +148,15 @@ class CPktTimeWindow: CPktTimeWindowTools m_tsProbeTime(), m_Probe1Sequence(-1) { - pthread_mutex_init(&m_lockPktWindow, NULL); - pthread_mutex_init(&m_lockProbeWindow, NULL); + srt::sync::createMutex(m_lockPktWindow, "PktWindow"); + srt::sync::createMutex(m_lockProbeWindow, "ProbeWindow"); CPktTimeWindowTools::initializeWindowArrays(m_aPktWindow, m_aProbeWindow, m_aBytesWindow, ASIZE, PSIZE); } ~CPktTimeWindow() { - pthread_mutex_destroy(&m_lockPktWindow); - pthread_mutex_destroy(&m_lockProbeWindow); + srt::sync::releaseMutex(m_lockPktWindow); + srt::sync::releaseMutex(m_lockProbeWindow); } /// read the minimum packet sending interval. @@ -326,11 +326,11 @@ class CPktTimeWindow: CPktTimeWindowTools int m_aPktWindow[ASIZE]; // packet information window (inter-packet time) int m_aBytesWindow[ASIZE]; // int m_iPktWindowPtr; // position pointer of the packet info. window. - mutable pthread_mutex_t m_lockPktWindow; // used to synchronize access to the packet window + mutable srt::sync::CMutex m_lockPktWindow; // used to synchronize access to the packet window int m_aProbeWindow[PSIZE]; // record inter-packet time for probing packet pairs int m_iProbeWindowPtr; // position pointer to the probing window - mutable pthread_mutex_t m_lockProbeWindow; // used to synchronize access to the probe window + mutable srt::sync::CMutex m_lockProbeWindow; // used to synchronize access to the probe window int m_iLastSentTime; // last packet sending time int m_iMinPktSndInt; // Minimum packet sending interval diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 1dfe44167..286669b21 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -264,49 +264,34 @@ TEST(SyncTimePoint, OperatorMinusEqDuration) template void TestSyncWaitFor() { - pthread_mutex_t mutex; - pthread_mutex_init(&mutex, NULL); - - pthread_cond_t cond; -#if ENABLE_MONOTONIC_CLOCK - if (USE_MONOTONIC_CLOCK) - { - pthread_condattr_t CondAttribs; - pthread_condattr_init(&CondAttribs); - pthread_condattr_setclock(&CondAttribs, CLOCK_MONOTONIC); - pthread_cond_init(&cond, &CondAttribs); - } - else - { - pthread_cond_init(&cond, NULL); - } -#else - pthread_cond_init(&cond, NULL); -#endif + CCondition cond; + CMutex mutex; + createCond(cond, "cond"); + createMutex_monotonic(mutex, "mutex"); for (int tout_us : {50, 100, 500, 1000, 101000, 1001000}) { const steady_clock::duration timeout = microseconds_from(tout_us); const steady_clock::time_point start = steady_clock::now(); if (USE_MONOTONIC_CLOCK) - EXPECT_FALSE(SyncEvent::wait_for_monotonic(&cond, &mutex, timeout) == 0); + EXPECT_FALSE(CondWaitFor_monotonic(&cond, &mutex, timeout) == 0); else - EXPECT_FALSE(SyncEvent::wait_for(&cond, &mutex, timeout) == 0); + EXPECT_FALSE(CondWaitfor(&cond, &mutex, timeout) == 0); const steady_clock::time_point stop = steady_clock::now(); if (tout_us < 1000) { - cerr << "SyncEvent::wait_for(" << count_microseconds(timeout) << "us) took " << count_microseconds(stop - start) + cerr << "CondWaitFor(" << count_microseconds(timeout) << "us) took " << count_microseconds(stop - start) << "us" << endl; } else { - cerr << "SyncEvent::wait_for(" << count_milliseconds(timeout) << " ms) took " + cerr << "CondWaitFor(" << count_milliseconds(timeout) << " ms) took " << count_microseconds(stop - start) / 1000.0 << " ms" << endl; } } - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); + releaseMutex(mutex); + releaseCond(cond); } @@ -324,69 +309,70 @@ TEST(SyncEvent, WaitForMonotonic) TEST(SyncEvent, WaitForNotifyOne) { - pthread_cond_t cond; - pthread_mutex_t mutex; - pthread_cond_init(&cond, NULL); - pthread_mutex_init(&mutex, NULL); + CCondition cond; + CMutex mutex; + createCond(cond, "cond"); + createMutex(mutex, "mutex"); const steady_clock::duration timeout = seconds_from(5); - auto wait_async = [](pthread_cond_t* cond, pthread_mutex_t* mutex, const steady_clock::duration& timeout) { + auto wait_async = [](CCondition* cond, CMutex* mutex, const steady_clock::duration& timeout) { CGuard gcguard(*mutex); - return SyncEvent::wait_for(cond, mutex, timeout); + return CondWaitFor(&*cond, &*mutex, timeout); }; auto wait_async_res = async(launch::async, wait_async, &cond, &mutex, timeout); EXPECT_EQ(wait_async_res.wait_for(chrono::milliseconds(100)), future_status::timeout); - pthread_cond_signal(&cond); + CSync::signal_relaxed(cond); ASSERT_EQ(wait_async_res.wait_for(chrono::milliseconds(100)), future_status::ready); const int wait_for_res = wait_async_res.get(); EXPECT_TRUE(wait_for_res == 0); - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); + releaseMutex(mutex); + releaseCond(cond); } TEST(SyncEvent, WaitNotifyOne) { - pthread_cond_t cond; - pthread_mutex_t mutex; - pthread_cond_init(&cond, NULL); - pthread_mutex_init(&mutex, NULL); + CCondition cond; + CMutex mutex; + createCond(cond, "cond"); + createMutex(mutex, "mutex"); - auto wait_async = [](pthread_cond_t* cond, pthread_mutex_t* mutex) { + auto wait_async = [](CCondition* cond, CMutex* mutex) { CGuard gcguard(*mutex); - return pthread_cond_wait(cond, mutex); + CSync gcsync(*cond, gcguard); + return gcsync.wait(); }; auto wait_async_res = async(launch::async, wait_async, &cond, &mutex); EXPECT_EQ(wait_async_res.wait_for(chrono::milliseconds(100)), future_status::timeout); - pthread_cond_signal(&cond); + CSync::signal_relaxed(cond); ASSERT_EQ(wait_async_res.wait_for(chrono::milliseconds(100)), future_status::ready); wait_async_res.get(); - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); + releaseMutex(mutex); + releaseCond(cond); } TEST(SyncEvent, WaitForTwoNotifyOne) { - pthread_cond_t cond; - pthread_mutex_t mutex; - pthread_cond_init(&cond, NULL); - pthread_mutex_init(&mutex, NULL); + CCondition cond; + CMutex mutex; + createCond(cond, "cond"); + createMutex(mutex, "mutex"); const steady_clock::duration timeout = seconds_from(3); - auto wait_async = [](pthread_cond_t* cond, pthread_mutex_t* mutex, const steady_clock::duration& timeout) { + auto wait_async = [](CCondition* cond, CMutex* mutex, const steady_clock::duration& timeout) { CGuard gcguard(*mutex); - return SyncEvent::wait_for(cond, mutex, timeout); + return CondWaitFor(&*cond, &*mutex, timeout); }; auto wait_async1_res = async(launch::async, wait_async, &cond, &mutex, timeout); auto wait_async2_res = async(launch::async, wait_async, &cond, &mutex, timeout); EXPECT_EQ(wait_async1_res.wait_for(chrono::milliseconds(100)), future_status::timeout); EXPECT_EQ(wait_async2_res.wait_for(chrono::milliseconds(100)), future_status::timeout); - pthread_cond_signal(&cond); + CSync::signal_relaxed(cond); // Now only one waiting thread should become ready const future_status status1 = wait_async1_res.wait_for(chrono::milliseconds(100)); const future_status status2 = wait_async2_res.wait_for(chrono::milliseconds(100)); @@ -400,28 +386,28 @@ TEST(SyncEvent, WaitForTwoNotifyOne) // Expect timeout on another thread EXPECT_FALSE(isready1 ? (wait_async2_res.get() == 0) : (wait_async1_res.get() == 0)); - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); + releaseMutex(mutex); + releaseCond(cond); } TEST(SyncEvent, WaitForTwoNotifyAll) { - pthread_cond_t cond; - pthread_mutex_t mutex; - pthread_cond_init(&cond, NULL); - pthread_mutex_init(&mutex, NULL); + CCondition cond; + CMutex mutex; + createCond(cond, "cond"); + createMutex(mutex, "mutex"); const steady_clock::duration timeout = seconds_from(3); - auto wait_async = [](pthread_cond_t* cond, pthread_mutex_t* mutex, const steady_clock::duration& timeout) { + auto wait_async = [](CCondition* cond, CMutex* mutex, const steady_clock::duration& timeout) { CGuard gcguard(*mutex); - return SyncEvent::wait_for(cond, mutex, timeout); + return CondWaitFor(&*cond, &*mutex, timeout); }; auto wait_async1_res = async(launch::async, wait_async, &cond, &mutex, timeout); auto wait_async2_res = async(launch::async, wait_async, &cond, &mutex, timeout); EXPECT_EQ(wait_async1_res.wait_for(chrono::milliseconds(100)), future_status::timeout); EXPECT_EQ(wait_async2_res.wait_for(chrono::milliseconds(100)), future_status::timeout); - pthread_cond_broadcast(&cond); + CSync::broadcast_relaxed(cond); // Now only one waiting thread should become ready const future_status status1 = wait_async1_res.wait_for(chrono::milliseconds(100)); const future_status status2 = wait_async2_res.wait_for(chrono::milliseconds(100)); @@ -431,32 +417,32 @@ TEST(SyncEvent, WaitForTwoNotifyAll) EXPECT_TRUE(wait_async1_res.get() == 0); EXPECT_TRUE(wait_async2_res.get() == 0); - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); + releaseMutex(mutex); + releaseCond(cond); } TEST(SyncEvent, WaitForNotifyAll) { - pthread_cond_t cond; - pthread_mutex_t mutex; - pthread_cond_init(&cond, NULL); - pthread_mutex_init(&mutex, NULL); + CCondition cond; + CMutex mutex; + createCond(cond, "cond"); + createMutex(mutex, "mutex"); const steady_clock::duration timeout = seconds_from(5); - auto wait_async = [](pthread_cond_t* cond, pthread_mutex_t* mutex, const steady_clock::duration& timeout) { + auto wait_async = [](CCondition* cond, CMutex* mutex, const steady_clock::duration& timeout) { CGuard gcguard(*mutex); - return SyncEvent::wait_for(cond, mutex, timeout); + return CondWaitFor(&*cond, &*mutex, timeout); }; auto wait_async_res = async(launch::async, wait_async, &cond, &mutex, timeout); EXPECT_EQ(wait_async_res.wait_for(chrono::milliseconds(500)), future_status::timeout); - pthread_cond_broadcast(&cond); + CSync::broadcast_relaxed(cond); ASSERT_EQ(wait_async_res.wait_for(chrono::milliseconds(500)), future_status::ready); const int wait_for_res = wait_async_res.get(); EXPECT_TRUE(wait_for_res == 0); - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); + releaseMutex(mutex); + releaseCond(cond); } /*****************************************************************************/ From 5d7e266356226034a3c972306f32dc038d2f4894 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 8 Jan 2020 19:52:37 +0100 Subject: [PATCH 3/6] Fixed merge errors --- srtcore/sync.cpp | 10 +++++++--- test/test_sync.cpp | 6 +++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/srtcore/sync.cpp b/srtcore/sync.cpp index 102067e97..7f7d49a30 100644 --- a/srtcore/sync.cpp +++ b/srtcore/sync.cpp @@ -100,19 +100,23 @@ const int64_t s_cpu_frequency = get_cpu_frequency(); // Automatically lock in constructor -CGuard::CGuard(pthread_mutex_t& lock, explicit_t shouldwork): +CGuard::CGuard(CMutex& lock, explicit_t shouldwork): m_Mutex(lock), m_iLocked(-1) { if (shouldwork) - m_iLocked = pthread_mutex_lock(&m_Mutex); + { + Lock(); + } } // Automatically unlock in destructor CGuard::~CGuard() { if (m_iLocked == 0) - pthread_mutex_unlock(&m_Mutex); + { + Unlock(); + } } int CGuard::enterCS(CMutex& lock, explicit_t block) diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 286669b21..a683f16f1 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -266,8 +266,8 @@ void TestSyncWaitFor() { CCondition cond; CMutex mutex; - createCond(cond, "cond"); - createMutex_monotonic(mutex, "mutex"); + createCond_monotonic(cond, "cond"); + createMutex(mutex, "mutex"); for (int tout_us : {50, 100, 500, 1000, 101000, 1001000}) { @@ -276,7 +276,7 @@ void TestSyncWaitFor() if (USE_MONOTONIC_CLOCK) EXPECT_FALSE(CondWaitFor_monotonic(&cond, &mutex, timeout) == 0); else - EXPECT_FALSE(CondWaitfor(&cond, &mutex, timeout) == 0); + EXPECT_FALSE(CondWaitFor(&cond, &mutex, timeout) == 0); const steady_clock::time_point stop = steady_clock::now(); if (tout_us < 1000) { From b3875197fb9cc88211b27bf37b70562e932c7282 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 9 Jan 2020 09:24:36 +0100 Subject: [PATCH 4/6] Fixed test bug: configure cond monotonic only if monotonic tested. Some cosmetix --- srtcore/sync.cpp | 1 - test/test_sync.cpp | 9 ++++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/srtcore/sync.cpp b/srtcore/sync.cpp index 7f7d49a30..f4c704670 100644 --- a/srtcore/sync.cpp +++ b/srtcore/sync.cpp @@ -174,7 +174,6 @@ void releaseMutex(CMutex& lock) void createCond(CCondition& cond, const char* name SRT_ATR_UNUSED) { - pthread_condattr_t* pattr = NULL; pthread_cond_init(RawAddr(cond), pattr); } diff --git a/test/test_sync.cpp b/test/test_sync.cpp index a683f16f1..96dada708 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -261,19 +261,22 @@ TEST(SyncTimePoint, OperatorMinusEqDuration) */ /*****************************************************************************/ -template +template void TestSyncWaitFor() { CCondition cond; CMutex mutex; - createCond_monotonic(cond, "cond"); + if (TEST_MONOTONIC_CLOCK) + createCond_monotonic(cond, "cond"); + else + createCond(cond, "cond"); createMutex(mutex, "mutex"); for (int tout_us : {50, 100, 500, 1000, 101000, 1001000}) { const steady_clock::duration timeout = microseconds_from(tout_us); const steady_clock::time_point start = steady_clock::now(); - if (USE_MONOTONIC_CLOCK) + if (TEST_MONOTONIC_CLOCK) EXPECT_FALSE(CondWaitFor_monotonic(&cond, &mutex, timeout) == 0); else EXPECT_FALSE(CondWaitFor(&cond, &mutex, timeout) == 0); From a0558cb70326b389e9a1c16c1aacd8d10d17d130 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Mon, 13 Jan 2020 18:37:38 +0100 Subject: [PATCH 5/6] Fixed conditional compiling for unit test --- test/test_sync.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/test_sync.cpp b/test/test_sync.cpp index 96dada708..d0e440189 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -266,20 +266,32 @@ void TestSyncWaitFor() { CCondition cond; CMutex mutex; +#if ENABLE_MONOTONIC_CLOCK if (TEST_MONOTONIC_CLOCK) + { createCond_monotonic(cond, "cond"); + } else + { createCond(cond, "cond"); + } +#else + createCond(cond, "cond"); +#endif createMutex(mutex, "mutex"); for (int tout_us : {50, 100, 500, 1000, 101000, 1001000}) { const steady_clock::duration timeout = microseconds_from(tout_us); const steady_clock::time_point start = steady_clock::now(); +#if ENABLE_MONOTONIC_CLOCK if (TEST_MONOTONIC_CLOCK) EXPECT_FALSE(CondWaitFor_monotonic(&cond, &mutex, timeout) == 0); else EXPECT_FALSE(CondWaitFor(&cond, &mutex, timeout) == 0); +#else + EXPECT_FALSE(CondWaitFor(&cond, &mutex, timeout) == 0); +#endif const steady_clock::time_point stop = steady_clock::now(); if (tout_us < 1000) { From f6a03f674930df77cbe46c1cec349bd6556cd555 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Wed, 15 Jan 2020 11:14:52 +0100 Subject: [PATCH 6/6] Fixed explicit_t to make it really function --- srtcore/utilities.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/srtcore/utilities.h b/srtcore/utilities.h index 020ec021b..7ea2a6988 100755 --- a/srtcore/utilities.h +++ b/srtcore/utilities.h @@ -465,6 +465,10 @@ struct explicit_t T inobject; explicit_t(const T& uo): inobject(uo) {} operator T() const { return inobject; } + +private: + template + explicit_t(const X& another); }; // Homecooked version of ref_t. It's a copy of std::reference_wrapper