Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Added busy counter for sockets and various fixes for data race problems #2893

Merged
merged 27 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0bee6d8
[test] Improved tests to avoid all-breaking asserts and inter-test me…
Feb 27, 2024
d8dfa0c
(First concept, failing tests, to be fixed)
Feb 27, 2024
79bb548
Updated and fixed
Feb 27, 2024
74592c1
Fixed more tests
Feb 27, 2024
2075a65
Fixed tab indent in CMakeLists
ethouris Feb 27, 2024
0f0597a
Merge branch 'dev-make-test-reuseaddr-fixture' into dev-add-socket-bu…
Feb 27, 2024
a82fd26
Added extra fixes for data races
Feb 28, 2024
278fc72
Fixed file transmission test on Windows
Feb 28, 2024
9459d36
Merge branch 'dev-make-test-reuseaddr-fixture' into dev-add-socket-bu…
Feb 28, 2024
cd5d771
Fixed some build breaks
Feb 28, 2024
085043a
Armed UniqueSocket in file/line grab for creation location
Feb 28, 2024
4638192
Still tracking close error on Ubuntu
Feb 29, 2024
ca98ee3
Added explicit closing of client socket to avoid auto-close-broken pr…
Feb 29, 2024
df2f8e7
Merge branch 'dev-make-test-reuseaddr-fixture' into dev-add-socket-bu…
Feb 29, 2024
212750d
Updated and merged
Mar 13, 2024
f56dba5
Updated to latest upstream. Fixed wrong comments
May 7, 2024
82bfbd3
Updated to latest master
Jul 18, 2024
11dd050
Applied fixes from code review
Jul 18, 2024
08a4010
Fixed constness of isStillBusy
Jul 18, 2024
6e2496a
Updated failing test to track the failure cause
Jul 18, 2024
326d324
Some sanitizer and warning fixes
Jul 25, 2024
ed5245d
Updated and fixed
Aug 14, 2024
97b26d9
Added guarding the socket for the whole length of a packet dispatchin…
Aug 15, 2024
9bccc3e
Updated and fixed
Aug 15, 2024
fb816e3
Merge remote-tracking branch 'ethouris/dev-add-socket-busy-counter' i…
Aug 15, 2024
abb99dc
Fixed per code review
Aug 19, 2024
85f7922
Merge remote-tracking branch 'ethouris/dev-add-socket-busy-counter' i…
Aug 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 78 additions & 6 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1908,11 +1908,29 @@ int srt::CUDTUnited::close(const SRTSOCKET u)
return 0;
}
#endif
CUDTSocket* s = locateSocket(u);
if (!s)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
#if ENABLE_HEAVY_LOGGING
// Wrapping the log into a destructor so that it
// is printed AFTER the destructor of SocketKeeper.
struct ForceDestructor
{
CUDTSocket* ps;
ForceDestructor(): ps(NULL){}
~ForceDestructor()
{
if (ps) // Could be not acquired by SocketKeeper, occasionally
{
HLOGC(smlog.Debug, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy());
}
}
} fod;
#endif

SocketKeeper k(*this, u, ERH_THROW);
IF_HEAVY_LOGGING(fod.ps = k.socket);
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
HLOGC(smlog.Debug, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy());
int ret = close(k.socket);

return close(s);
return ret;
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
}

#if ENABLE_BONDING
Expand Down Expand Up @@ -2541,6 +2559,45 @@ srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s)
}
#endif

srt::CUDTSocket* srt::CUDTUnited::locateAcquireSocket(SRTSOCKET u, ErrorHandling erh)
{
ScopedLock cg(m_GlobControlLock);

CUDTSocket* s = locateSocket_LOCKED(u);
if (!s)
{
if (erh == ERH_THROW)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
return NULL;
}

s->apiAcquire();
return s;
}

bool srt::CUDTUnited::acquireSocket(CUDTSocket* s)
{
// Note that before using this function you must be certain
// that the socket isn't broken already and it still has at least
// one more GC cycle to live. In other words, you must be certain
// that this pointer passed here isn't dangling and was obtained
// directly from m_Sockets, or even better, has been acquired
// by some other functionality already, which is only about to
// be released earlier than you need.
ScopedLock cg(m_GlobControlLock);
s->apiAcquire();
// Keep the lock so that no one changes anything in the meantime.
// If the socket m_Status == SRTS_CLOSED (set by setClosed()), then
// this socket is no longer present in the m_Sockets container
if (s->m_Status >= SRTS_BROKEN)
{
s->apiRelease();
return false;
}

return true;
}

srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn)
{
ScopedLock cg(m_GlobControlLock);
Expand Down Expand Up @@ -2607,7 +2664,7 @@ void srt::CUDTUnited::checkBrokenSockets()

if (s->m_Status == SRTS_LISTENING)
{
const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp;
const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp.load();
// A listening socket should wait an extra 3 seconds
// in case a client is connecting.
if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
Expand Down Expand Up @@ -2666,6 +2723,13 @@ void srt::CUDTUnited::checkBrokenSockets()
for (sockets_t::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++j)
{
CUDTSocket* ps = j->second;

if (ps->isStillBusy())
{
HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << ps->m_SocketID << " is still busy, SKIPPING THIS CYCLE.");
continue;
}

maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
CUDT& u = ps->core();

// HLOGC(smlog.Debug, log << "checking CLOSED socket: " << j->first);
Expand All @@ -2685,7 +2749,7 @@ void srt::CUDTUnited::checkBrokenSockets()
// timeout 1 second to destroy a socket AND it has been removed from
// RcvUList
const steady_clock::time_point now = steady_clock::now();
const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp;
const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp.load();
if (closed_ago > seconds_from(1))
{
CRNode* rnode = u.m_pRNode;
Expand Down Expand Up @@ -2735,6 +2799,14 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
if (rn && rn->m_bOnList)
return;

if (s->isStillBusy())
{
HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " is still busy, NOT deleting");
return;
}

LOGC(smlog.Note, log << "@" << s->m_SocketID << " busy=" << s->isStillBusy());

#if ENABLE_BONDING
if (s->m_GroupOf)
{
Expand Down
59 changes: 57 additions & 2 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ class CUDTSocket

void construct();

private:
srt::sync::atomic<int> m_iBusy;
public:
void apiAcquire() { ++m_iBusy; }
void apiRelease() { --m_iBusy; }

int isStillBusy()
{
return m_iBusy;
}


SRT_ATTR_GUARDED_BY(m_ControlLock)
sync::atomic<SRT_SOCKSTATUS> m_Status; //< current socket state

Expand All @@ -131,7 +143,8 @@ class CUDTSocket
/// of sockets in order to prevent other methods from accessing invalid address.
/// A timer is started and the socket will be removed after approximately
/// 1 second (see CUDTUnited::checkBrokenSockets()).
sync::steady_clock::time_point m_tsClosureTimeStamp;
//sync::steady_clock::time_point m_tsClosureTimeStamp;
sync::AtomicClock<sync::steady_clock> m_tsClosureTimeStamp;

sockaddr_any m_SelfAddr; //< local address of the socket
sockaddr_any m_PeerAddr; //< peer address of the socket
Expand Down Expand Up @@ -442,8 +455,50 @@ class CUDTUnited
}
}
};

#endif

CUDTSocket* locateAcquireSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN);
bool acquireSocket(CUDTSocket* s);

public:
struct SocketKeeper
{
CUDTSocket* socket;

SocketKeeper(): socket(NULL) {}

// This is intended for API functions to lock the socket's existence
// for the lifetime of their call.
SocketKeeper(CUDTUnited& glob, SRTSOCKET id, ErrorHandling erh = ERH_RETURN) { socket = glob.locateAcquireSocket(id, erh); }

// This is intended for TSBPD thread that should lock the socket's
// existence until it exits.
SocketKeeper(CUDTUnited& glob, CUDTSocket* s)
{
acquire(glob, s);
}

// Note: acquire doesn't check if the keeper already keeps anything.
// This is only for a use together with an empty constructor.
bool acquire(CUDTUnited& glob, CUDTSocket* s)
{
bool caught = glob.acquireSocket(s);
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
socket = caught ? s : NULL;
return caught;
}

~SocketKeeper()
{
if (socket)
{
SRT_ASSERT(socket->isStillBusy() > 0);
socket->apiRelease();
}
}
};

private:

void updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* = NULL);
bool updateListenerMux(CUDTSocket* s, const CUDTSocket* ls);

Expand Down
36 changes: 22 additions & 14 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7121,7 +7121,7 @@

do
{
if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady(steady_clock::now()))
if (stillConnected() && !timeout && !isRcvBufferReady())
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
{
/* Kick TsbPd thread to schedule next wakeup (if running) */
if (m_bTsbPd)
Expand Down Expand Up @@ -8723,11 +8723,14 @@
// srt_recvfile (which doesn't make any sense), you'll have a deadlock.
if (m_config.bDriftTracer)
{
//enterCS(m_RcvBufferLock);
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show resolved Hide resolved

#if ENABLE_BONDING
ScopedLock glock(uglobal().m_GlobControlLock);
ScopedLock glock(uglobal().m_GlobControlLock); // XXX not too excessive?
const bool drift_updated =
#endif
m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt);
//leaveCS(m_RcvBufferLock);
Fixed Show fixed Hide fixed

#if ENABLE_BONDING
if (drift_updated && m_parent->m_GroupOf)
Expand Down Expand Up @@ -9993,10 +9996,12 @@
{
const bool need_tsbpd = m_bTsbPd || m_bGroupTsbPd;

if (need_tsbpd && !m_RcvTsbPdThread.joinable())
{
ScopedLock lock(m_RcvTsbPdStartupLock);
if (!need_tsbpd)
return 0;
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved

ScopedLock lock(m_RcvTsbPdStartupLock);
if (!m_RcvTsbPdThread.joinable())
{
if (m_bClosing) // Check again to protect join() in CUDT::releaseSync()
return -1;

Expand Down Expand Up @@ -11701,17 +11706,20 @@
// Bound to one call because this requires locking
pg->updateFailedLink();
}
// Sockets that never succeeded to connect must be deleted
// explicitly, otherwise they will never be deleted. OTOH
// the socket can be on the path of deletion already, so
// this only makes sure that the socket will be deleted,
// one way or another.
if (pending_broken)
{
// XXX This somehow can cause a deadlock
// uglobal()->close(m_parent);
LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group.");
m_parent->setBrokenClosed();
}
}

// Sockets that never succeeded to connect must be deleted
// explicitly, otherwise they will never be deleted.
if (pending_broken)
{
// XXX This somehow can cause a deadlock
// uglobal()->close(m_parent);
LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group.");
m_parent->setBrokenClosed();
}
#endif
}

Expand Down
35 changes: 32 additions & 3 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,13 @@ void* srt::CSndQueue::worker(void* param)
continue;
}

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), u->id());
if (!sk.socket)
{
HLOGC(qslog.Debug, log << "Socket to be processed was deleted in the meantime, not packing");
continue;
}

// pack a packet from the socket
CPacket pkt;
steady_clock::time_point next_send_time;
Expand All @@ -588,7 +595,6 @@ void* srt::CSndQueue::worker(void* param)
IF_DEBUG_HIGHRATE(self->m_WorkerStats.lNotReadyPop++);
continue;
}

const sockaddr_any addr = u->m_PeerAddr;
if (!is_zero(next_send_time))
self->m_pSndUList->update(u, CSndUList::DO_RESCHEDULE, next_send_time);
Expand Down Expand Up @@ -931,6 +937,16 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
EReadStatus read_st = rst;
EConnectStatus conn_st = cst;

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id);
if (!sk.socket)
{
// Socket deleted already, so stop this and proceed to the next loop.
LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, proceed to only removal from lists");
toRemove.push_back(*i);
continue;
}


if (cst != CONN_RENDEZVOUS && dest_id != 0)
{
if (i->id != dest_id)
Expand Down Expand Up @@ -976,14 +992,22 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
for (vector<LinkStatusInfo>::iterator i = toRemove.begin(); i != toRemove.end(); ++i)
{
HLOGC(cnlog.Debug, log << "updateConnStatus: COMPLETING dep objects update on failed @" << i->id);
//
remove(i->id);

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id);
if (!sk.socket)
{
// This actually shall never happen, so it's a kind of paranoid check.
LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, NOT ACCESSING its contents");
continue;
}

// Setting m_bConnecting to false, and need to remove the socket from the rendezvous queue
// because the next CUDT::close will not remove it from the queue when m_bConnecting = false,
// and may crash on next pass.
//
// TODO: maybe lock i->u->m_ConnectionLock?
i->u->m_bConnecting = false;
remove(i->u->m_SocketID);

// DO NOT close the socket here because in this case it might be
// unable to get status from at the right moment. Also only member
Expand All @@ -994,6 +1018,11 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
CUDT::uglobal().m_EPoll.update_events(
i->u->m_SocketID, i->u->m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true);

// Make sure that the socket wasn't deleted in the meantime.
// Skip this part if it was. Note also that if the socket was
// decided to be deleted, it's already moved to m_ClosedSockets
// and should have been therefore already processed for deletion.

i->u->completeBrokenConnectionDependencies(i->errorcode);
}

Expand Down
Loading