Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REFAX] Refactored function calls at close socket for preparing a change #2639

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 40 additions & 21 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ srt::CUDTUnited::CUDTUnited()
, m_InitLock()
, m_iInstanceCount(0)
, m_bGCStatus(false)
, m_ClosedSockets()
{
// Socket ID MUST start from a random value
m_SocketIDGenerator = genRandomInt(1, MAX_SOCKET_VAL);
Expand Down Expand Up @@ -469,6 +468,16 @@ SRTSOCKET srt::CUDTUnited::newSocket(CUDTSocket** pps)
return ns->m_SocketID;
}

// [[using locked(m_GlobControlLock)]]
void srt::CUDTUnited::swipeSocket_LOCKED(SRTSOCKET id, CUDTSocket* s, CUDTUnited::SwipeSocketTerm lateremove)
{
m_ClosedSockets[id] = s;
if (!lateremove)
{
m_Sockets.erase(id);
}
}

int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
const sockaddr_any& peer,
const CPacket& hspkt,
Expand Down Expand Up @@ -826,8 +835,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
ns->removeFromGroup(true);
}
#endif
m_Sockets.erase(id);
m_ClosedSockets[id] = ns;
swipeSocket_LOCKED(id, ns, SWIPE_NOW);
}

return -1;
Expand Down Expand Up @@ -2020,8 +2028,7 @@ int srt::CUDTUnited::close(CUDTSocket* s)
}
#endif

m_Sockets.erase(s->m_SocketID);
m_ClosedSockets[s->m_SocketID] = s;
swipeSocket_LOCKED(s->m_SocketID, s, SWIPE_NOW);
HLOGC(smlog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later.");

CGlobEvent::triggerEvent();
Expand Down Expand Up @@ -2623,7 +2630,9 @@ void srt::CUDTUnited::checkBrokenSockets()
// close broken connections and start removal timer
s->setClosed();
tbc.push_back(i->first);
m_ClosedSockets[i->first] = s;

// NOTE: removal from m_SocketID POSTPONED.
swipeSocket_LOCKED(i->first, s, SWIPE_LATER);

// remove from listener's queue
sockets_t::iterator ls = m_Sockets.find(s->m_ListenSocket);
Expand Down Expand Up @@ -2716,9 +2725,6 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
s->removeFromGroup(true);
}
#endif
// decrease multiplexer reference count, and remove it if necessary
const int mid = s->m_iMuxID;

{
ScopedLock cg(s->m_AcceptLock);

Expand All @@ -2739,8 +2745,7 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
CUDTSocket* as = si->second;

as->breakSocket_LOCKED();
m_ClosedSockets[*q] = as;
m_Sockets.erase(*q);
swipeSocket_LOCKED(*q, as, SWIPE_NOW);
}
}

Expand All @@ -2765,33 +2770,39 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)

HLOGC(smlog.Debug, log << "GC/removeSocket: closing associated UDT @" << u);
s->core().closeInternal();
removeMux(s);
HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u);
delete s;
HLOGC(smlog.Debug, log << "GC/removeSocket: socket @" << u << " DELETED. Checking muxer.");
}

if (mid == -1)
// decrease multiplexer reference count, and remove it if necessary
// [[using locked(m_GlobControlLock)]]
void srt::CUDTUnited::removeMux(CUDTSocket* s)
{
int mid = s->m_iMuxID;
if (mid == -1) // Ignore those already removed
{
HLOGC(smlog.Debug, log << "GC/removeSocket: no muxer found, finishing.");
HLOGC(smlog.Debug, log << "removeMux: @" << s->m_SocketID << " has no muxer, ok.");
return;
}

map<int, CMultiplexer>::iterator m;
m = m_mMultiplexer.find(mid);
if (m == m_mMultiplexer.end())
{
LOGC(smlog.Fatal, log << "IPE: For socket @" << u << " MUXER id=" << mid << " NOT FOUND!");
LOGC(smlog.Fatal, log << "IPE: For socket @" << s->m_SocketID << " MUXER id=" << mid << " NOT FOUND!");
return;
}

CMultiplexer& mx = m->second;

mx.m_iRefCount--;
HLOGC(smlog.Debug, log << "unrefing underlying muxer " << mid << " for @" << u << ", ref=" << mx.m_iRefCount);
if (0 == mx.m_iRefCount)
HLOGC(smlog.Debug, log << "unrefing underlying muxer " << mid << " for @" << s->m_SocketID << ", ref=" << mx.m_iRefCount);
if (mx.m_iRefCount <= 0)
{
HLOGC(smlog.Debug,
log << "MUXER id=" << mid << " lost last socket @" << u << " - deleting muxer bound to port "
<< mx.m_pChannel->bindAddressAny().hport());
HLOGC(smlog.Debug, log << "MUXER id=" << mid << " lost last socket @"
<< s->m_SocketID << " - deleting muxer bound to "
<< mx.m_pChannel->bindAddressAny().str());
// The channel has no access to the queues and
// it looks like the multiplexer is the master of all of them.
// The queues must be silenced before closing the channel
Expand All @@ -2802,6 +2813,10 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
mx.destroy();
m_mMultiplexer.erase(m);
}
else
{
HLOGC(smlog.Debug, log << "MUXER id=" << mid << " has still " << mx.m_iRefCount << " users");
}
}

void srt::CUDTUnited::configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af)
Expand Down Expand Up @@ -3130,7 +3145,11 @@ void* srt::CUDTUnited::garbageCollect(void* p)
s->removeFromGroup(false);
}
#endif
self->m_ClosedSockets[i->first] = s;

// NOTE: not removing the socket from m_Sockets.
// This is a loop over m_Sockets and after this loop ends,
// this whole container will be cleared.
self->swipeSocket_LOCKED(i->first, s, self->SWIPE_LATER);

// remove from listener's queue
sockets_t::iterator ls = self->m_Sockets.find(s->m_ListenSocket);
Expand Down
10 changes: 10 additions & 0 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,15 @@ class CUDTUnited
/// @return The new UDT socket ID, or INVALID_SOCK.
SRTSOCKET newSocket(CUDTSocket** pps = NULL);

enum SwipeSocketTerm { SWIPE_NOW = 0, SWIPE_LATER = 1 };
/// Removes the socket from the global socket container
/// and place it in the socket trashcan. The socket should
/// remain there until all still pending activities are
/// finished and there are no more users of this socket.
/// Note that the swiped socket is no longer dispatchable
/// by id.
ethouris marked this conversation as resolved.
Show resolved Hide resolved
void swipeSocket_LOCKED(SRTSOCKET id, CUDTSocket* s, SwipeSocketTerm);

/// Create (listener-side) a new socket associated with the incoming connection request.
/// @param [in] listen the listening socket ID.
/// @param [in] peer peer address.
Expand Down Expand Up @@ -446,6 +455,7 @@ class CUDTUnited
#endif
void updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* = NULL);
bool updateListenerMux(CUDTSocket* s, const CUDTSocket* ls);
void removeMux(CUDTSocket* s);

// Utility functions for updateMux
void configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af);
Expand Down