-
Notifications
You must be signed in to change notification settings - Fork 851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Fix a data race by adding a shared mutex #2961
Changes from all commits
3f83e2a
82a052e
a4686d3
c91ebee
44d298a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -67,6 +67,36 @@ namespace srt | |||||
{ | ||||||
class CChannel; | ||||||
class CUDT; | ||||||
class CUDTWrapper; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The class definition follows below. This declaration is excessive. |
||||||
|
||||||
class CUDTWrapper { | ||||||
public: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
CUDT *udt; | ||||||
sync::SharedMutex mut; | ||||||
|
||||||
public: | ||||||
CUDTWrapper() | ||||||
:udt(NULL) | ||||||
,mut() | ||||||
{ | ||||||
} | ||||||
void lockRead() | ||||||
{ | ||||||
return mut.lockRead(); | ||||||
} | ||||||
void lockWrite() | ||||||
{ | ||||||
return mut.lockWrite(); | ||||||
} | ||||||
void unlockRead() | ||||||
{ | ||||||
return mut.unlockRead(); | ||||||
|
||||||
} | ||||||
void unlockWrite(){ | ||||||
return mut.unlockWrite(); | ||||||
} | ||||||
}; | ||||||
Comment on lines
+72
to
+99
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't quite like the name. It does not express what the class does, except for wrapping template <class T>
class CSharedObject
{
private:
T* m_pObj;
sync::SharedMutex m_mtx;
public:
// ...
}; |
||||||
|
||||||
struct CUnit | ||||||
{ | ||||||
|
@@ -555,7 +585,7 @@ class CRcvQueue | |||||
|
||||||
private: | ||||||
sync::Mutex m_LSLock; | ||||||
CUDT* m_pListener; // pointer to the (unique, if any) listening UDT entity | ||||||
CUDTWrapper m_pListener; // pointer to the (unique, if any) listening UDT entity | ||||||
CRendezvousQueue* m_pRendezvousQueue; // The list of sockets in rendezvous mode | ||||||
|
||||||
std::vector<CUDT*> m_vNewEntry; // newly added entries, to be inserted | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
#define INC_SRT_SYNC_H | ||
|
||
#include "platform_sys.h" | ||
#include <limits.h> | ||
|
||
#include <cstdlib> | ||
#include <limits> | ||
|
@@ -943,9 +944,164 @@ | |
/// @param[in] maxVal maximum allowed value of the resulting random number. | ||
int genRandomInt(int minVal, int maxVal); | ||
|
||
class SharedMutex | ||
{ | ||
private: | ||
Condition m_pLockWriteCond; | ||
Condition m_pLockReadCond; | ||
|
||
Mutex m_pMutex; | ||
Mutex m_pMutex2; | ||
|
||
int m_pCountRead; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "p" mean pointer, while the variable is not a pointer. |
||
bool m_pWriterLocked; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 'p' means "pointer". This variable is of a type "Boolean", so should be |
||
|
||
|
||
public: | ||
SharedMutex() | ||
:m_pLockWriteCond() | ||
,m_pLockReadCond() | ||
,m_pMutex() | ||
,m_pMutex2() | ||
,m_pCountRead(0) | ||
,m_pWriterLocked(false) | ||
{ | ||
m_pCountRead = 0; | ||
m_pWriterLocked = false; | ||
|
||
} | ||
|
||
void lockWrite() | ||
{ | ||
UniqueLock l1(m_pMutex); | ||
if(m_pWriterLocked) | ||
m_pLockWriteCond.wait(l1); | ||
m_pWriterLocked = true; | ||
if(m_pCountRead) | ||
m_pLockReadCond.wait(l1); | ||
|
||
|
||
} | ||
|
||
void unlockWrite() | ||
{ | ||
UniqueLock l2(m_pMutex); | ||
m_pWriterLocked = false; | ||
l2.unlock(); | ||
std::cout << "NOTIFY ALL" << std::endl; | ||
m_pLockWriteCond.notify_all(); | ||
std::cout << "WRITER NOTIFIED" << std::endl; | ||
|
||
} | ||
|
||
void lockRead() | ||
{ | ||
std::cout << "TRY LOCK READ " << this->m_pCountRead << this->m_pWriterLocked << std::endl; | ||
UniqueLock l3(m_pMutex); | ||
if(m_pWriterLocked) | ||
m_pLockWriteCond.wait(l3); | ||
m_pCountRead++; | ||
std::cout << "LOCKED READ" << std::endl; | ||
} | ||
|
||
void unlockRead() | ||
{ | ||
std::cout << "UNLOCK READ" << std::endl; | ||
ScopedLock l4(m_pMutex); | ||
m_pCountRead--; | ||
if(m_pWriterLocked && m_pCountRead == 0) | ||
m_pLockReadCond.notify_one(); | ||
else if (m_pCountRead > 0) | ||
m_pLockWriteCond.notify_one(); | ||
std::cout << "READ UNLOCKED" << std::endl; | ||
|
||
|
||
} | ||
|
||
}; | ||
|
||
/* REFERENCE IMPLEMENTATION | ||
class shared_mutex | ||
{ | ||
Mutex mut_; | ||
Condition gate1_; | ||
Condition gate2_; | ||
unsigned state_; | ||
|
||
static const unsigned write_entered_ = 1U << (sizeof(unsigned)*CHAR_BIT - 1); | ||
static const unsigned n_readers_ = ~write_entered_; | ||
|
||
public: | ||
|
||
shared_mutex() : state_(0) {} | ||
|
||
|
||
// Exclusive ownership | ||
|
||
void | ||
lock() | ||
{ | ||
UniqueLock lk(mut_); | ||
std::cout << "LOCK WRITE " << std::endl; | ||
while (state_ & write_entered_) | ||
gate1_.wait(lk); | ||
state_ |= write_entered_; | ||
while (state_ & n_readers_) | ||
gate2_.wait(lk); | ||
std::cout << "LOCK WRITE DONE" << std::endl; | ||
|
||
} | ||
|
||
void | ||
unlock() | ||
{ | ||
{ | ||
ScopedLock _(mut_); | ||
state_ = 0; | ||
} | ||
std::cout << "UNLOCK WRITE " << std::endl; | ||
gate1_.notify_all(); | ||
std::cout << "UNLOCK WRITE DONE" << std::endl; | ||
|
||
} | ||
|
||
// Shared ownership | ||
|
||
void | ||
lock_shared() | ||
{ | ||
UniqueLock lk(mut_); | ||
while ((state_ & write_entered_) || (state_ & n_readers_) == n_readers_) | ||
gate1_.wait(lk); | ||
unsigned num_readers = (state_ & n_readers_) + 1; | ||
state_ &= ~n_readers_; | ||
state_ |= num_readers; | ||
} | ||
|
||
void | ||
unlock_shared() | ||
{ | ||
ScopedLock _(mut_); | ||
unsigned num_readers = (state_ & n_readers_) - 1; | ||
state_ &= ~n_readers_; | ||
state_ |= num_readers; | ||
if (state_ & write_entered_) | ||
{ | ||
if (num_readers == 0) | ||
gate2_.notify_one(); | ||
} | ||
else | ||
{ | ||
if (num_readers == n_readers_ - 1) | ||
gate1_.notify_one(); | ||
} | ||
} | ||
};*/ | ||
Comment on lines
+1023
to
+1099
Check notice Code scanning / CodeQL Commented-out code Note
This comment appears to contain commented-out code.
|
||
|
||
} // namespace sync | ||
} // namespace srt | ||
|
||
|
||
#include "atomic_clock.h" | ||
|
||
#endif // INC_SRT_SYNC_H |
Check notice
Code scanning / CodeQL
Commented-out code Note