Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix a crash inside updateConnStatus #2194

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 43 additions & 25 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ bool srt::CUDTSocket::broken() const
srt::CUDTUnited::CUDTUnited():
m_Sockets(),
m_GlobControlLock(),
m_UpdateConnStatusLock(),
m_IDLock(),
m_mMultiplexer(),
m_MultiplexerLock(),
Expand All @@ -197,6 +198,7 @@ srt::CUDTUnited::CUDTUnited():
// might destroy the application before `main`. This shouldn't
// be a problem in general.
setupMutex(m_GlobControlLock, "GlobControl");
setupMutex(m_UpdateConnStatusLock, "UpdateConnectionStatus");
setupMutex(m_IDLock, "ID");
setupMutex(m_InitLock, "Init");

Expand All @@ -214,6 +216,7 @@ srt::CUDTUnited::~CUDTUnited()
}

releaseMutex(m_GlobControlLock);
releaseMutex(m_UpdateConnStatusLock);
releaseMutex(m_IDLock);
releaseMutex(m_InitLock);

Expand Down Expand Up @@ -2570,8 +2573,9 @@ srt::CUDTSocket* srt::CUDTUnited::locatePeer(
return NULL;
}

void srt::CUDTUnited::checkBrokenSockets()
void srt::CUDTUnited::checkBrokenSockets(PlexerList& toDestroy)
{
ScopedLock us(m_UpdateConnStatusLock);
ScopedLock cg(m_GlobControlLock);

#if ENABLE_EXPERIMENTAL_BONDING
Expand Down Expand Up @@ -2710,11 +2714,11 @@ void srt::CUDTUnited::checkBrokenSockets()

// remove those timeout sockets
for (vector<SRTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l)
removeSocket(*l);
removeSocket(*l, toDestroy);
}

// [[using locked(m_GlobControlLock)]]
void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
void srt::CUDTUnited::removeSocket(const SRTSOCKET u, PlexerList& toDestroy)
{
sockets_t::iterator i = m_ClosedSockets.find(u);

Expand Down Expand Up @@ -2809,26 +2813,38 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
LOGC(smlog.Fatal, log << "IPE: For socket @" << u << " MUXER id=" << mid << " NOT FOUND!");
return;
}
toDestroy.push_back(m);


CMultiplexer& mx = m->second;

}

mx.m_iRefCount --;
// HLOGF(smlog.Debug, "unrefing underlying socket for %u: %u\n",
// u, mx.m_iRefCount);
if (0 == mx.m_iRefCount)
{
HLOGC(smlog.Debug, log << "MUXER id=" << mid << " lost last socket @"
<< u << " - deleting muxer bound to port "
<< mx.m_pChannel->bindAddressAny().hport());
// The channel has no access to the queues and
// it looks like the multiplexer is the master of all of them.
// The queues must be silenced before closing the channel
// because this will cause error to be returned in any operation
// being currently done in the queues, if any.
mx.m_pSndQueue->setClosing();
mx.m_pRcvQueue->setClosing();
mx.destroy();
m_mMultiplexer.erase(m);
void srt::CUDTUnited::tryDestroyMuxer(PlexerList& toDestroy)
{
ScopedLock cg(m_GlobControlLock);
for (PlexerList::iterator it = toDestroy.begin(); it != toDestroy.end(); ++it)
{
std::map<int, CMultiplexer>::iterator m = *it;
CMultiplexer& mx = m->second;
mx.m_iRefCount--;
// HLOGF(smlog.Debug, "unrefing underlying socket for %u: %u\n",
// u, mx.m_iRefCount);

if (0 == mx.m_iRefCount)
{
HLOGC(smlog.Debug,
log << "MUXER id=" << mx.m_iID << " lost last socket - deleting muxer bound to port "
<< mx.m_pChannel->bindAddressAny().hport());
// The channel has no access to the queues and
// it looks like the multiplexer is the master of all of them.
// The queues must be silenced before closing the channel
// because this will cause error to be returned in any operation
// being currently done in the queues, if any.
mx.m_pSndQueue->setClosing();
mx.m_pRcvQueue->setClosing();
mx.destroy();
m_mMultiplexer.erase(m);
}
}
}

Expand Down Expand Up @@ -3132,8 +3148,9 @@ void* srt::CUDTUnited::garbageCollect(void* p)
while (!self->m_bClosing)
{
INCREMENT_THREAD_ITERATIONS();
self->checkBrokenSockets();

PlexerList PlexerstoDestroy;
self->checkBrokenSockets(PlexerstoDestroy);
self->tryDestroyMuxer(PlexerstoDestroy);
HLOGC(inlog.Debug, log << "GC: sleep 1 s");
self->m_GCStopCond.wait_for(gclock, seconds_from(1));
}
Expand Down Expand Up @@ -3185,8 +3202,9 @@ void* srt::CUDTUnited::garbageCollect(void* p)
HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all CLOSED sockets.");
while (true)
{
self->checkBrokenSockets();

PlexerList PlexerstoDestroy;
self->checkBrokenSockets(PlexerstoDestroy);
self->tryDestroyMuxer(PlexerstoDestroy);
enterCS(self->m_GlobControlLock);
bool empty = self->m_ClosedSockets.empty();
leaveCS(self->m_GlobControlLock);
Expand Down
10 changes: 6 additions & 4 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ friend class CRendezvousQueue;

sync::Mutex m_GlobControlLock; // used to synchronize UDT API

sync::Mutex m_UpdateConnStatusLock; // used to synchronize Garbage Collector and UpdateConnectionStatus

sync::Mutex m_IDLock; // used to synchronize ID generation

SRTSOCKET m_SocketIDGenerator; // seed to generate a new unique socket ID
Expand Down Expand Up @@ -472,10 +474,10 @@ friend class CRendezvousQueue;
#if ENABLE_EXPERIMENTAL_BONDING
groups_t m_ClosedGroups;
#endif

void checkBrokenSockets();
void removeSocket(const SRTSOCKET u);

typedef std::list<std::map<int, CMultiplexer>::iterator> PlexerList;
void checkBrokenSockets(PlexerList& toDestroy);
void removeSocket(const SRTSOCKET u, PlexerList& toDestroy);
void tryDestroyMuxer(PlexerList& toDestroy);
CEPoll m_EPoll; // handling epoll data structures and events

private:
Expand Down
2 changes: 2 additions & 0 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,8 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
// Need a stub value for a case when there's no unit provided ("storage depleted" case).
// It should be normally NOT IN USE because in case of "storage depleted", rst != RST_OK.
const SRTSOCKET dest_id = pkt ? pkt->m_iID : 0;

ScopedLock us(CUDT::uglobal().m_UpdateConnStatusLock);

// If no socket were qualified for further handling, finish here.
// Otherwise toRemove and toProcess contain items to handle.
Expand Down