Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread API wrappers required for thread logging #1054

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 51 additions & 48 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ m_AcceptLock(),
m_uiBackLog(0),
m_iMuxID(-1)
{
pthread_mutex_init(&m_AcceptLock, NULL);
pthread_cond_init(&m_AcceptCond, NULL);
pthread_mutex_init(&m_ControlLock, NULL);
createMutex(m_AcceptLock, "Accept");
createCond(m_AcceptCond, "Accept");
createMutex(m_ControlLock, "Control");
}

CUDTSocket::~CUDTSocket()
Expand All @@ -110,9 +110,9 @@ CUDTSocket::~CUDTSocket()
delete m_pQueuedSockets;
delete m_pAcceptSockets;

pthread_mutex_destroy(&m_AcceptLock);
pthread_cond_destroy(&m_AcceptCond);
pthread_mutex_destroy(&m_ControlLock);
releaseMutex(m_AcceptLock);
releaseCond(m_AcceptCond);
releaseMutex(m_ControlLock);
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -145,9 +145,9 @@ m_ClosedSockets()
srand((unsigned int)t.tv_usec);
m_SocketIDGenerator = 1 + (int)((1 << 30) * (double(rand()) / RAND_MAX));

pthread_mutex_init(&m_GlobControlLock, NULL);
pthread_mutex_init(&m_IDLock, NULL);
pthread_mutex_init(&m_InitLock, NULL);
createMutex(m_GlobControlLock, "GlobControl");
createMutex(m_IDLock, "ID");
createMutex(m_InitLock, "Init");

pthread_key_create(&m_TLSError, TLSDestroy);

Expand All @@ -164,9 +164,9 @@ CUDTUnited::~CUDTUnited()
cleanup();
}

pthread_mutex_destroy(&m_GlobControlLock);
pthread_mutex_destroy(&m_IDLock);
pthread_mutex_destroy(&m_InitLock);
releaseMutex(m_GlobControlLock);
releaseMutex(m_IDLock);
releaseMutex(m_InitLock);

delete (CUDTException*)pthread_getspecific(m_TLSError);
pthread_key_delete(m_TLSError);
Expand Down Expand Up @@ -209,15 +209,8 @@ int CUDTUnited::startup()
return true;

m_bClosing = false;
pthread_mutex_init(&m_GCStopLock, NULL);
#if ENABLE_MONOTONIC_CLOCK
pthread_condattr_t CondAttribs;
pthread_condattr_init(&CondAttribs);
pthread_condattr_setclock(&CondAttribs, CLOCK_MONOTONIC);
pthread_cond_init(&m_GCStopCond, &CondAttribs);
#else
pthread_cond_init(&m_GCStopCond, NULL);
#endif
createMutex(m_GCStopLock, "GCStop");
createCond_monotonic(m_GCStopCond, "GCStop");
{
ThreadName tn("SRT:GC");
pthread_create(&m_GCThread, NULL, garbageCollect, this);
Expand All @@ -241,8 +234,13 @@ int CUDTUnited::cleanup()
return 0;

m_bClosing = true;
pthread_cond_signal(&m_GCStopCond);
pthread_join(m_GCThread, NULL);
// NOTE: we can do relaxed signaling here because
// waiting on m_GCStopCond has a 1-second timeout,
// after which the m_bClosing flag is cheched, which
// is set here above. Worst case secenario, this
// jointhread() call will block for 1 second.
CSync::signal_relaxed(m_GCStopCond);
jointhread(m_GCThread);

// XXX There's some weird bug here causing this
// to hangup on Windows. This might be either something
Expand All @@ -251,8 +249,8 @@ int CUDTUnited::cleanup()
// tolerated with simply exit the application without cleanup,
// counting on that the system will take care of it anyway.
#ifndef _WIN32
pthread_mutex_destroy(&m_GCStopLock);
pthread_cond_destroy(&m_GCStopCond);
releaseMutex(m_GCStopLock);
releaseCond(m_GCStopCond);
#endif

m_bGCStatus = false;
Expand Down Expand Up @@ -502,7 +500,7 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer,
SRTSOCKET id = ns->m_SocketID;
ns->m_pUDT->close();
ns->m_Status = SRTS_CLOSED;
ns->m_tsClosureTimeStamp = srt::sync::steady_clock::now();
ns->m_tsClosureTimeStamp = steady_clock::now();
// The mapped socket should be now unmapped to preserve the situation that
// was in the original UDT code.
// In SRT additionally the acceptAndRespond() function (it was called probably
Expand All @@ -518,9 +516,7 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer,
}

// wake up a waiting accept() call
pthread_mutex_lock(&(ls->m_AcceptLock));
pthread_cond_signal(&(ls->m_AcceptCond));
pthread_mutex_unlock(&(ls->m_AcceptLock));
CSync::lock_signal(ls->m_AcceptCond, ls->m_AcceptLock);

return 1;
}
Expand Down Expand Up @@ -725,6 +721,7 @@ SRTSOCKET CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_
while (!accepted)
{
CGuard cg(ls->m_AcceptLock);
CSync axcond(ls->m_AcceptCond, cg);

if ((ls->m_Status != SRTS_LISTENING) || ls->m_pUDT->m_bBroken)
{
Expand Down Expand Up @@ -763,7 +760,7 @@ SRTSOCKET CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_
}

if (!accepted && (ls->m_Status == SRTS_LISTENING))
pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock));
axcond.wait();

if (ls->m_pQueuedSockets->empty())
m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);
Expand Down Expand Up @@ -850,7 +847,7 @@ int CUDTUnited::connect(const SRTSOCKET u, const sockaddr* name, int namelen, in
{
// InvertedGuard unlocks in the constructor, then locks in the
// destructor, no matter if an exception has fired.
InvertedGuard l_unlocker( s->m_pUDT->m_bSynRecving ? &s->m_ControlLock : 0 );
InvertedGuard l_unlocker (s->m_ControlLock, s->m_pUDT->m_bSynRecving);
s->m_pUDT->startConnect(target_addr, forced_isn);
}
catch (CUDTException& e) // Interceptor, just to change the state.
Expand Down Expand Up @@ -907,10 +904,7 @@ int CUDTUnited::close(const SRTSOCKET u)
}

// broadcast all "accept" waiting
pthread_mutex_lock(&(s->m_AcceptLock));
pthread_cond_broadcast(&(s->m_AcceptCond));
pthread_mutex_unlock(&(s->m_AcceptLock));

CSync::lock_broadcast(s->m_AcceptCond, s->m_AcceptLock);
}
else
{
Expand Down Expand Up @@ -1491,17 +1485,17 @@ void CUDTUnited::checkBrokenSockets()
// asynchronous close:
if ((!j->second->m_pUDT->m_pSndBuffer)
|| (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize())
|| (j->second->m_pUDT->m_tsLingerExpiration <= srt::sync::steady_clock::now()))
|| (j->second->m_pUDT->m_tsLingerExpiration <= steady_clock::now()))
{
j->second->m_pUDT->m_tsLingerExpiration = steady_clock::time_point();
j->second->m_pUDT->m_bClosing = true;
j->second->m_tsClosureTimeStamp = srt::sync::steady_clock::now();
j->second->m_tsClosureTimeStamp = steady_clock::now();
}
}

// timeout 1 second to destroy a socket AND it has been removed from
// RcvUList
if ((srt::sync::steady_clock::now() - j->second->m_tsClosureTimeStamp > seconds_from(1))
if ((steady_clock::now() - j->second->m_tsClosureTimeStamp > seconds_from(1))
&& ((!j->second->m_pUDT->m_pRNode)
|| !j->second->m_pUDT->m_pRNode->m_bOnList))
{
Expand Down Expand Up @@ -1541,7 +1535,7 @@ void CUDTUnited::removeSocket(const SRTSOCKET u)
{
m_Sockets[*q]->m_pUDT->m_bBroken = true;
m_Sockets[*q]->m_pUDT->close();
m_Sockets[*q]->m_tsClosureTimeStamp = srt::sync::steady_clock::now();
m_Sockets[*q]->m_tsClosureTimeStamp = steady_clock::now();
m_Sockets[*q]->m_Status = SRTS_CLOSED;
m_ClosedSockets[*q] = m_Sockets[*q];
m_Sockets.erase(*q);
Expand Down Expand Up @@ -1821,14 +1815,15 @@ void* CUDTUnited::garbageCollect(void* p)
THREAD_STATE_INIT("SRT:GC");

CGuard gcguard(self->m_GCStopLock);
CSync gcsync(self->m_GCStopCond, gcguard);

while (!self->m_bClosing)
{
INCREMENT_THREAD_ITERATIONS();
self->checkBrokenSockets();

HLOGC(mglog.Debug, log << "GC: sleep 1 s");
SyncEvent::wait_for_monotonic(&self->m_GCStopCond, &self->m_GCStopLock, seconds_from(1));
gcsync.wait_for_monotonic(seconds_from(1));
}

// remove all sockets and multiplexers
Expand Down Expand Up @@ -3112,54 +3107,62 @@ SRT_SOCKSTATUS getsockstate(SRTSOCKET u)

void setloglevel(LogLevel::type ll)
{
CGuard gg(srt_logger_config.mutex);
srt_logger_config.lock();
srt_logger_config.max_level = ll;
srt_logger_config.unlock();
}

void addlogfa(LogFA fa)
{
CGuard gg(srt_logger_config.mutex);
srt_logger_config.lock();
srt_logger_config.enabled_fa.set(fa, true);
srt_logger_config.unlock();
}

void dellogfa(LogFA fa)
{
CGuard gg(srt_logger_config.mutex);
srt_logger_config.lock();
srt_logger_config.enabled_fa.set(fa, false);
srt_logger_config.unlock();
}

void resetlogfa(set<LogFA> fas)
{
CGuard gg(srt_logger_config.mutex);
srt_logger_config.lock();
for (int i = 0; i <= SRT_LOGFA_LASTNONE; ++i)
srt_logger_config.enabled_fa.set(i, fas.count(i));
srt_logger_config.unlock();
}

void resetlogfa(const int* fara, size_t fara_size)
{
CGuard gg(srt_logger_config.mutex);
srt_logger_config.lock();
srt_logger_config.enabled_fa.reset();
for (const int* i = fara; i != fara + fara_size; ++i)
srt_logger_config.enabled_fa.set(*i, true);
srt_logger_config.unlock();
}

void setlogstream(std::ostream& stream)
{
CGuard gg(srt_logger_config.mutex);
srt_logger_config.lock();
srt_logger_config.log_stream = &stream;
srt_logger_config.unlock();
}

void setloghandler(void* opaque, SRT_LOG_HANDLER_FN* handler)
{
CGuard gg(srt_logger_config.mutex);
srt_logger_config.lock();
srt_logger_config.loghandler_opaque = opaque;
srt_logger_config.loghandler_fn = handler;
srt_logger_config.unlock();
}

void setlogflags(int flags)
{
CGuard gg(srt_logger_config.mutex);
srt_logger_config.lock();
srt_logger_config.flags = flags;
srt_logger_config.unlock();
}

SRT_API bool setstreamid(SRTSOCKET u, const std::string& sid)
Expand Down
18 changes: 9 additions & 9 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ class CUDTSocket
std::set<SRTSOCKET>* m_pQueuedSockets; //< set of connections waiting for accept()
std::set<SRTSOCKET>* m_pAcceptSockets; //< set of accept()ed connections

pthread_cond_t m_AcceptCond; //< used to block "accept" call
pthread_mutex_t m_AcceptLock; //< mutex associated to m_AcceptCond
srt::sync::CCondition m_AcceptCond; //< used to block "accept" call
srt::sync::CMutex m_AcceptLock; //< mutex associated to m_AcceptCond

unsigned int m_uiBackLog; //< maximum number of connections in queue

int m_iMuxID; //< multiplexer ID

pthread_mutex_t m_ControlLock; //< lock this socket exclusively for control APIs: bind/listen/connect
srt::sync::CMutex m_ControlLock; //< lock this socket exclusively for control APIs: bind/listen/connect

static int64_t getPeerSpec(SRTSOCKET id, int32_t isn)
{
Expand Down Expand Up @@ -212,9 +212,9 @@ friend class CRendezvousQueue;
private:
std::map<SRTSOCKET, CUDTSocket*> m_Sockets; // stores all the socket structures

pthread_mutex_t m_GlobControlLock; // used to synchronize UDT API
srt::sync::CMutex m_GlobControlLock; // used to synchronize UDT API

pthread_mutex_t m_IDLock; // used to synchronize ID generation
srt::sync::CMutex m_IDLock; // used to synchronize ID generation
SRTSOCKET m_SocketIDGenerator; // seed to generate a new unique socket ID

std::map<int64_t, std::set<SRTSOCKET> > m_PeerRec;// record sockets from peers to avoid repeated connection request, int64_t = (socker_id << 30) + isn
Expand All @@ -231,17 +231,17 @@ friend class CRendezvousQueue;

private:
std::map<int, CMultiplexer> m_mMultiplexer; // UDP multiplexer
pthread_mutex_t m_MultiplexerLock;
srt::sync::CMutex m_MultiplexerLock;

private:
CCache<CInfoBlock>* m_pCache; // UDT network information cache

private:
volatile bool m_bClosing;
pthread_mutex_t m_GCStopLock;
pthread_cond_t m_GCStopCond;
srt::sync::CMutex m_GCStopLock;
srt::sync::CCondition m_GCStopCond;

pthread_mutex_t m_InitLock;
srt::sync::CMutex m_InitLock;
int m_iInstanceCount; // number of startup() called by application
bool m_bGCStatus; // if the GC thread is working (true)

Expand Down
10 changes: 5 additions & 5 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ CSndBuffer::CSndBuffer(int size, int mss)

m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;

pthread_mutex_init(&m_BufLock, NULL);
createMutex(m_BufLock, "Buf");
}

CSndBuffer::~CSndBuffer()
Expand All @@ -135,7 +135,7 @@ CSndBuffer::~CSndBuffer()
delete temp;
}

pthread_mutex_destroy(&m_BufLock);
releaseMutex(m_BufLock);
}

void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order, uint64_t srctime, ref_t<int32_t> r_msgno)
Expand Down Expand Up @@ -712,7 +712,7 @@ m_iNotch(0)
memset(m_TsbPdDriftHisto1ms, 0, sizeof(m_TsbPdDriftHisto1ms));
#endif

pthread_mutex_init(&m_BytesCountLock, NULL);
createMutex(m_BytesCountLock, "BytesCount");
}

CRcvBuffer::~CRcvBuffer()
Expand All @@ -727,7 +727,7 @@ CRcvBuffer::~CRcvBuffer()

delete [] m_pUnit;

pthread_mutex_destroy(&m_BytesCountLock);
releaseMutex(m_BytesCountLock);
}

void CRcvBuffer::countBytes(int pkts, int bytes, bool acked)
Expand Down Expand Up @@ -1556,7 +1556,7 @@ void CRcvBuffer::printDriftOffset(int tsbPdOffset, int tsbPdDriftAvg)
}
#endif /* SRT_DEBUG_TSBPD_DRIFT */

void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, pthread_mutex_t& mutex_to_lock)
void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, CMutex& mutex_to_lock)
{
if (!m_bTsbPdMode) // Not checked unless in TSBPD mode
return;
Expand Down
6 changes: 3 additions & 3 deletions srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class CSndBuffer
static const int INPUTRATE_INITIAL_BYTESPS = BW_INFINITE;

private:
pthread_mutex_t m_BufLock; // used to synchronize buffer operation
srt::sync::CMutex m_BufLock; // used to synchronize buffer operation

struct Block
{
Expand Down Expand Up @@ -376,7 +376,7 @@ class CRcvBuffer
/// @param [in] timestamp packet time stamp
/// @param [ref] lock Mutex that should be locked for the operation

void addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& lock);
void addRcvTsbPdDriftSample(uint32_t timestamp, srt::sync::CMutex& lock);

#ifdef SRT_DEBUG_TSBPD_DRIFT
void printDriftHistogram(int64_t iDrift);
Expand Down Expand Up @@ -514,7 +514,7 @@ class CRcvBuffer
// up to which data are already retrieved;
// in message reading mode it's unused and always 0)

pthread_mutex_t m_BytesCountLock; // used to protect counters operations
srt::sync::CMutex m_BytesCountLock; // used to protect counters operations
int m_iBytesCount; // Number of payload bytes in the buffer
int m_iAckedPktsCount; // Number of acknowledged pkts in the buffer
int m_iAckedBytesCount; // Number of acknowledged payload bytes in the buffer
Expand Down
Loading