Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Added shared_lock to srt::sync. #2996

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,6 @@ srt::CRcvQueue::CRcvQueue()
, m_iIPversion()
, m_szPayloadSize()
, m_bClosing(false)
, m_LSLock()
, m_pListener(NULL)
, m_pRendezvousQueue(NULL)
, m_vNewEntry()
, m_IDLock()
Expand Down Expand Up @@ -1404,11 +1402,13 @@ srt::EConnectStatus srt::CRcvQueue::worker_ProcessConnectionRequest(CUnit* unit,
int listener_ret = SRT_REJ_UNKNOWN;
bool have_listener = false;
{
ScopedLock cg(m_LSLock);
if (m_pListener)
SharedLock shl(m_pListener);
CUDT* pListener = m_pListener.getPtrNoLock();

if (pListener)
{
LOGC(cnlog.Debug, log << "PASSING request from: " << addr.str() << " to listener:" << m_pListener->socketID());
listener_ret = m_pListener->processConnectRequest(addr, unit->m_Packet);
LOGC(cnlog.Debug, log << "PASSING request from: " << addr.str() << " to listener:" << pListener->socketID());
listener_ret = pListener->processConnectRequest(addr, unit->m_Packet);

// This function does return a code, but it's hard to say as to whether
// anything can be done about it. In case when it's stated possible, the
Expand Down Expand Up @@ -1690,21 +1690,15 @@ int srt::CRcvQueue::recvfrom(int32_t id, CPacket& w_packet)

int srt::CRcvQueue::setListener(CUDT* u)
{
ScopedLock lslock(m_LSLock);

if (NULL != m_pListener)
if (!m_pListener.set(u))
return -1;

m_pListener = u;
return 0;
}

void srt::CRcvQueue::removeListener(const CUDT* u)
{
ScopedLock lslock(m_LSLock);

if (u == m_pListener)
m_pListener = NULL;
m_pListener.clearIf(u);
}

void srt::CRcvQueue::registerConnector(const SRTSOCKET& id,
Expand Down
5 changes: 2 additions & 3 deletions srtcore/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,8 @@ class CRcvQueue
void storePktClone(int32_t id, const CPacket& pkt);

private:
sync::Mutex m_LSLock;
CUDT* m_pListener; // pointer to the (unique, if any) listening UDT entity
CRendezvousQueue* m_pRendezvousQueue; // The list of sockets in rendezvous mode
sync::CSharedObjectPtr<CUDT> m_pListener; // pointer to the (unique, if any) listening UDT entity
CRendezvousQueue* m_pRendezvousQueue; // The list of sockets in rendezvous mode

std::vector<CUDT*> m_vNewEntry; // newly added entries, to be inserted
sync::Mutex m_IDLock;
Expand Down
163 changes: 123 additions & 40 deletions srtcore/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,19 @@ class SRT_ATTR_CAPABILITY("mutex") Mutex
pthread_mutex_t m_mutex;
};

/// A pthread version of std::chrono::scoped_lock<mutex> (or lock_guard for C++11)
/// A pthread version of std::scoped_lock (or lock_guard for C++11).
class SRT_ATTR_SCOPED_CAPABILITY ScopedLock
{
public:
SRT_ATTR_ACQUIRE(m)
explicit ScopedLock(Mutex& m);
explicit ScopedLock(Mutex& m)
: m_mutex(m)
{
m_mutex.lock();
}

SRT_ATTR_RELEASE()
~ScopedLock();
~ScopedLock() { m_mutex.unlock(); }

private:
Mutex& m_mutex;
Expand Down Expand Up @@ -481,6 +485,122 @@ class Condition
inline void setupCond(Condition& cv, const char*) { cv.init(); }
inline void releaseCond(Condition& cv) { cv.destroy(); }

///////////////////////////////////////////////////////////////////////////////
//
// Shared Mutex section
//
///////////////////////////////////////////////////////////////////////////////

/// Implementation of a read-write mutex.
/// This allows multiple readers at a time, or a single writer.
/// TODO: The class can be improved if needed to give writer a preference
/// by adding additional m_iWritersWaiting member variable (counter).
/// TODO: The m_iCountRead could be made atomic to make unlok_shared() faster and lock-free.
class SharedMutex
{
public:
SharedMutex();
~SharedMutex();

public:
/// Acquire the lock for writting purposes. Only one thread can acquire this lock at a time
/// Once it is locked, no reader can acquire it
void lock();
bool try_lock();
void unlock();

/// Acquire the lock if no writter already has it. For read purpose only
/// Several readers can lock this at the same time.
void lock_shared();
bool try_lock_shared();
void unlock_shared();

int getReaderCount() const;

protected:
Condition m_LockWriteCond;
Condition m_LockReadCond;

mutable Mutex m_Mutex;

int m_iCountRead;
bool m_bWriterLocked;
};

/// A version of std::scoped_lock<std::shared_mutex> (or lock_guard for C++11).
/// We could have used the srt::sync::ScopedLock making it a template-based class.
/// But in that case all usages would have to be specificed like ScopedLock<Mutex> in C++03.
class SRT_ATTR_SCOPED_CAPABILITY ExclusiveLock
{
public:
SRT_ATTR_ACQUIRE(m)
explicit ExclusiveLock(SharedMutex& m)
: m_mutex(m)
{
m_mutex.lock();
}

SRT_ATTR_RELEASE()
~ExclusiveLock() { m_mutex.unlock(); }

private:
SharedMutex& m_mutex;
};

/// A reduced implementation of the std::shared_lock functionality (available in C++14).
class SRT_ATTR_SCOPED_CAPABILITY SharedLock
{
public:
SRT_ATTR_ACQUIRE_SHARED(m)
explicit SharedLock(SharedMutex& m)
: m_mtx(m)
{
m_mtx.lock_shared();
}

SRT_ATTR_RELEASE_SHARED(m_mtx)
~SharedLock() { m_mtx.unlock_shared(); }

private:
SharedMutex& m_mtx;
};

/// A class template for a shared object. It is a wrapper around a pointer to an object
/// and a shared mutex. It allows multiple readers to access the object at the same time,
/// but only one writer can access the object at a time.
template <class T>
class CSharedObjectPtr : public SharedMutex
{
public:
CSharedObjectPtr<T>()
: m_pObj(NULL)
{
}

bool set(T* pObj)
{
ExclusiveLock lock(*this);
if (m_pObj)
return false;
m_pObj = pObj;
return true;
}

bool clearIf(const T* pObj)
{
ExclusiveLock lock(*this);
if (m_pObj != pObj)
return false;
m_pObj = NULL;
return true;
}

T* getPtrNoLock() const { return m_pObj; }

private:
T* m_pObj;
};

///////////////////////////////////////////////////////////////////////////////
//
// Event (CV) section
Expand Down Expand Up @@ -943,43 +1063,6 @@ CUDTException& GetThreadLocalError();
/// @param[in] maxVal maximum allowed value of the resulting random number.
int genRandomInt(int minVal, int maxVal);


/// Implementation of a read-write mutex.
/// This allows multiple readers at a time, or a single writer.
/// TODO: The class can be improved if needed to give writer a preference
/// by adding additional m_iWritersWaiting member variable (counter).
/// TODO: The m_iCountRead could be made atomic to make unlok_shared() faster and lock-free.
class SharedMutex
{
public:
SharedMutex();
~SharedMutex();

private:
Condition m_LockWriteCond;
Condition m_LockReadCond;

mutable Mutex m_Mutex;

int m_iCountRead;
bool m_bWriterLocked;

/// Acquire the lock for writting purposes. Only one thread can acquire this lock at a time
/// Once it is locked, no reader can acquire it
public:
void lock();
bool try_lock();
void unlock();

/// Acquire the lock if no writter already has it. For read purpose only
/// Several readers can lock this at the same time.
void lock_shared();
bool try_lock_shared();
void unlock_shared();

int getReaderCount() const;
};

} // namespace sync
} // namespace srt

Expand Down
12 changes: 0 additions & 12 deletions srtcore/sync_posix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,6 @@ bool srt::sync::Mutex::try_lock()
return (pthread_mutex_trylock(&m_mutex) == 0);
}

srt::sync::ScopedLock::ScopedLock(Mutex& m)
: m_mutex(m)
{
m_mutex.lock();
}

srt::sync::ScopedLock::~ScopedLock()
{
m_mutex.unlock();
}


srt::sync::UniqueLock::UniqueLock(Mutex& m)
: m_Mutex(m)
{
Expand Down
Loading