Skip to content

Commit

Permalink
[core] Added shared_lock.
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Aug 5, 2024
1 parent 968c9f9 commit 75f8526
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 62 deletions.
24 changes: 9 additions & 15 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,6 @@ srt::CRcvQueue::CRcvQueue()
, m_iIPversion()
, m_szPayloadSize()
, m_bClosing(false)
, m_LSLock()
, m_pListener(NULL)
, m_pRendezvousQueue(NULL)
, m_vNewEntry()
, m_IDLock()
Expand Down Expand Up @@ -1404,11 +1402,13 @@ srt::EConnectStatus srt::CRcvQueue::worker_ProcessConnectionRequest(CUnit* unit,
int listener_ret = SRT_REJ_UNKNOWN;
bool have_listener = false;
{
ScopedLock cg(m_LSLock);
if (m_pListener)
SharedLock shl(m_pListener);
CUDT* pListener = m_pListener.getPtrNoLock();

if (pListener)
{
LOGC(cnlog.Debug, log << "PASSING request from: " << addr.str() << " to listener:" << m_pListener->socketID());
listener_ret = m_pListener->processConnectRequest(addr, unit->m_Packet);
LOGC(cnlog.Debug, log << "PASSING request from: " << addr.str() << " to listener:" << pListener->socketID());
listener_ret = pListener->processConnectRequest(addr, unit->m_Packet);

// This function does return a code, but it's hard to say as to whether
// anything can be done about it. In case when it's stated possible, the
Expand Down Expand Up @@ -1690,21 +1690,15 @@ int srt::CRcvQueue::recvfrom(int32_t id, CPacket& w_packet)

int srt::CRcvQueue::setListener(CUDT* u)
{
ScopedLock lslock(m_LSLock);

if (NULL != m_pListener)
return -1;
if (!m_pListener.set(u))
return -1;

m_pListener = u;
return 0;
}

void srt::CRcvQueue::removeListener(const CUDT* u)
{
ScopedLock lslock(m_LSLock);

if (u == m_pListener)
m_pListener = NULL;
m_pListener.clearIf(u);
}

void srt::CRcvQueue::registerConnector(const SRTSOCKET& id,
Expand Down
5 changes: 2 additions & 3 deletions srtcore/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,8 @@ class CRcvQueue
void storePktClone(int32_t id, const CPacket& pkt);

private:
sync::Mutex m_LSLock;
CUDT* m_pListener; // pointer to the (unique, if any) listening UDT entity
CRendezvousQueue* m_pRendezvousQueue; // The list of sockets in rendezvous mode
sync::CSharedObjectPtr<CUDT> m_pListener; // pointer to the (unique, if any) listening UDT entity
CRendezvousQueue* m_pRendezvousQueue; // The list of sockets in rendezvous mode

std::vector<CUDT*> m_vNewEntry; // newly added entries, to be inserted
sync::Mutex m_IDLock;
Expand Down
133 changes: 101 additions & 32 deletions srtcore/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,22 @@ class SRT_ATTR_CAPABILITY("mutex") Mutex
};

/// A pthread version of std::chrono::scoped_lock<mutex> (or lock_guard for C++11)
template <class _Mutex>
class SRT_ATTR_SCOPED_CAPABILITY ScopedLock
{
public:
SRT_ATTR_ACQUIRE(m)
explicit ScopedLock(Mutex& m);
explicit ScopedLock(_Mutex& m)
: m_mutex(m)
{
m_mutex.lock();
}

SRT_ATTR_RELEASE()
~ScopedLock();
~ScopedLock() { m_mutex.unlock(); }

private:
Mutex& m_mutex;
_Mutex& m_mutex;
};

/// A pthread version of std::chrono::unique_lock<mutex>
Expand Down Expand Up @@ -481,6 +486,67 @@ class Condition
inline void setupCond(Condition& cv, const char*) { cv.init(); }
inline void releaseCond(Condition& cv) { cv.destroy(); }

///////////////////////////////////////////////////////////////////////////////
//
// Shared Mutex section
//
///////////////////////////////////////////////////////////////////////////////

/// Implementation of a read-write mutex.
/// This allows multiple readers at a time, or a single writer.
/// TODO: The class can be improved if needed to give writer a preference
/// by adding additional m_iWritersWaiting member variable (counter).
/// TODO: The m_iCountRead could be made atomic to make unlok_shared() faster and lock-free.
class SharedMutex
{
public:
SharedMutex();
~SharedMutex();

public:
/// Acquire the lock for writting purposes. Only one thread can acquire this lock at a time
/// Once it is locked, no reader can acquire it
void lock();
bool try_lock();
void unlock();

/// Acquire the lock if no writter already has it. For read purpose only
/// Several readers can lock this at the same time.
void lock_shared();
bool try_lock_shared();
void unlock_shared();

int getReaderCount() const;

protected:
Condition m_LockWriteCond;
Condition m_LockReadCond;

mutable Mutex m_Mutex;

int m_iCountRead;
bool m_bWriterLocked;

};

/// A reduced implementation of the std::shared_lock functionality (available in C++14).
class SRT_ATTR_SCOPED_CAPABILITY SharedLock
{
public:
SRT_ATTR_ACQUIRE_SHARED(m)
explicit SharedLock(SharedMutex& m)
: m_mtx(m)
{
m_mtx.lock_shared();
}

SRT_ATTR_RELEASE_SHARED(m_mtx)
~SharedLock() { m_mtx.unlock_shared(); }

private:
SharedMutex& m_mtx;
};

///////////////////////////////////////////////////////////////////////////////
//
// Event (CV) section
Expand Down Expand Up @@ -943,41 +1009,44 @@ CUDTException& GetThreadLocalError();
/// @param[in] maxVal maximum allowed value of the resulting random number.
int genRandomInt(int minVal, int maxVal);


/// Implementation of a read-write mutex.
/// This allows multiple readers at a time, or a single writer.
/// TODO: The class can be improved if needed to give writer a preference
/// by adding additional m_iWritersWaiting member variable (counter).
/// TODO: The m_iCountRead could be made atomic to make unlok_shared() faster and lock-free.
class SharedMutex
/// A class template for a shared object. It is a wrapper around a pointer to an object
/// and a shared mutex. It allows multiple readers to access the object at the same time,
/// but only one writer can access the object at a time.
template <class T>
class CSharedObjectPtr
: public SharedMutex
{
public:
SharedMutex();
~SharedMutex();

private:
Condition m_LockWriteCond;
Condition m_LockReadCond;

mutable Mutex m_Mutex;
CSharedObjectPtr<T>()
: m_pObj(NULL)
{
}

int m_iCountRead;
bool m_bWriterLocked;
bool set(T* pObj)
{
ScopedLock lock(m_Mutex);
if (m_pObj)
return false;
m_pObj = pObj;
return true;
}

/// Acquire the lock for writting purposes. Only one thread can acquire this lock at a time
/// Once it is locked, no reader can acquire it
public:
void lock();
bool try_lock();
void unlock();
bool clearIf(const T* pObj)
{
ScopedLock lock(m_Mutex);
if (m_pObj != pObj)
return false;
m_pObj = NULL;
return true;
}

/// Acquire the lock if no writter already has it. For read purpose only
/// Several readers can lock this at the same time.
void lock_shared();
bool try_lock_shared();
void unlock_shared();
T* getPtrNoLock() const
{
return m_pObj;
}

int getReaderCount() const;
private:
T* m_pObj;
};

} // namespace sync
Expand Down
12 changes: 0 additions & 12 deletions srtcore/sync_posix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,6 @@ bool srt::sync::Mutex::try_lock()
return (pthread_mutex_trylock(&m_mutex) == 0);
}

srt::sync::ScopedLock::ScopedLock(Mutex& m)
: m_mutex(m)
{
m_mutex.lock();
}

srt::sync::ScopedLock::~ScopedLock()
{
m_mutex.unlock();
}


srt::sync::UniqueLock::UniqueLock(Mutex& m)
: m_Mutex(m)
{
Expand Down

0 comments on commit 75f8526

Please sign in to comment.