Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Added busy counter for sockets and various fixes for data race problems #2893

Merged
merged 27 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0bee6d8
[test] Improved tests to avoid all-breaking asserts and inter-test me…
Feb 27, 2024
d8dfa0c
(First concept, failing tests, to be fixed)
Feb 27, 2024
79bb548
Updated and fixed
Feb 27, 2024
74592c1
Fixed more tests
Feb 27, 2024
2075a65
Fixed tab indent in CMakeLists
ethouris Feb 27, 2024
0f0597a
Merge branch 'dev-make-test-reuseaddr-fixture' into dev-add-socket-bu…
Feb 27, 2024
a82fd26
Added extra fixes for data races
Feb 28, 2024
278fc72
Fixed file transmission test on Windows
Feb 28, 2024
9459d36
Merge branch 'dev-make-test-reuseaddr-fixture' into dev-add-socket-bu…
Feb 28, 2024
cd5d771
Fixed some build breaks
Feb 28, 2024
085043a
Armed UniqueSocket in file/line grab for creation location
Feb 28, 2024
4638192
Still tracking close error on Ubuntu
Feb 29, 2024
ca98ee3
Added explicit closing of client socket to avoid auto-close-broken pr…
Feb 29, 2024
df2f8e7
Merge branch 'dev-make-test-reuseaddr-fixture' into dev-add-socket-bu…
Feb 29, 2024
212750d
Updated and merged
Mar 13, 2024
f56dba5
Updated to latest upstream. Fixed wrong comments
May 7, 2024
82bfbd3
Updated to latest master
Jul 18, 2024
11dd050
Applied fixes from code review
Jul 18, 2024
08a4010
Fixed constness of isStillBusy
Jul 18, 2024
6e2496a
Updated failing test to track the failure cause
Jul 18, 2024
326d324
Some sanitizer and warning fixes
Jul 25, 2024
ed5245d
Updated and fixed
Aug 14, 2024
97b26d9
Added guarding the socket for the whole length of a packet dispatchin…
Aug 15, 2024
9bccc3e
Updated and fixed
Aug 15, 2024
fb816e3
Merge remote-tracking branch 'ethouris/dev-add-socket-busy-counter' i…
Aug 15, 2024
abb99dc
Fixed per code review
Aug 19, 2024
85f7922
Merge remote-tracking branch 'ethouris/dev-add-socket-busy-counter' i…
Aug 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 84 additions & 6 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1914,11 +1914,28 @@ int srt::CUDTUnited::close(const SRTSOCKET u)
return 0;
}
#endif
CUDTSocket* s = locateSocket(u);
if (!s)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
#if ENABLE_HEAVY_LOGGING
// Wrapping the log into a destructor so that it
// is printed AFTER the destructor of SocketKeeper.
struct ScopedExitLog
{
const CUDTSocket* const ps;
ScopedExitLog(const CUDTSocket* p): ps(p){}
~ScopedExitLog()
{
if (ps) // Could be not acquired by SocketKeeper, occasionally
{
HLOGC(smlog.Debug, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy());
}
}
};
#endif

SocketKeeper k(*this, u, ERH_THROW);
IF_HEAVY_LOGGING(ScopedExitLog slog(k.socket));
HLOGC(smlog.Debug, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy());

return close(s);
return close(k.socket);
}

#if ENABLE_BONDING
Expand Down Expand Up @@ -2547,6 +2564,45 @@ srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s)
}
#endif

srt::CUDTSocket* srt::CUDTUnited::locateAcquireSocket(SRTSOCKET u, ErrorHandling erh)
{
ScopedLock cg(m_GlobControlLock);

CUDTSocket* s = locateSocket_LOCKED(u);
if (!s)
{
if (erh == ERH_THROW)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
return NULL;
}

s->apiAcquire();
return s;
}

bool srt::CUDTUnited::acquireSocket(CUDTSocket* s)
{
// Note that before using this function you must be certain
// that the socket isn't broken already and it still has at least
// one more GC cycle to live. In other words, you must be certain
// that this pointer passed here isn't dangling and was obtained
// directly from m_Sockets, or even better, has been acquired
// by some other functionality already, which is only about to
// be released earlier than you need.
ScopedLock cg(m_GlobControlLock);
s->apiAcquire();
// Keep the lock so that no one changes anything in the meantime.
// If the socket m_Status == SRTS_CLOSED (set by setClosed()), then
// this socket is no longer present in the m_Sockets container
if (s->m_Status >= SRTS_BROKEN)
{
s->apiRelease();
return false;
}

return true;
}

srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn)
{
ScopedLock cg(m_GlobControlLock);
Expand Down Expand Up @@ -2613,7 +2669,7 @@ void srt::CUDTUnited::checkBrokenSockets()

if (s->m_Status == SRTS_LISTENING)
{
const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp;
const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp.load();
// A listening socket should wait an extra 3 seconds
// in case a client is connecting.
if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
Expand Down Expand Up @@ -2672,6 +2728,20 @@ void srt::CUDTUnited::checkBrokenSockets()
for (sockets_t::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++j)
{
CUDTSocket* ps = j->second;

// NOTE: There is still a hypothetical risk here that ps
// was made busy while the socket was already moved to m_ClosedSocket,
// if the socket was acquired through CUDTUnited::acquireSocket (that is,
// busy flag acquisition was done through the CUDTSocket* pointer rather
// than through the numeric ID). Therefore this way of busy acquisition
// should be done only if at the moment of acquisition there are certainly
// other conditions applying on the socket that prevent it from being deleted.
if (ps->isStillBusy())
{
HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << ps->m_SocketID << " is still busy, SKIPPING THIS CYCLE.");
continue;
}

CUDT& u = ps->core();

// HLOGC(smlog.Debug, log << "checking CLOSED socket: " << j->first);
Expand All @@ -2691,7 +2761,7 @@ void srt::CUDTUnited::checkBrokenSockets()
// timeout 1 second to destroy a socket AND it has been removed from
// RcvUList
const steady_clock::time_point now = steady_clock::now();
const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp;
const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp.load();
if (closed_ago > seconds_from(1))
{
CRNode* rnode = u.m_pRNode;
Expand Down Expand Up @@ -2741,6 +2811,14 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
if (rn && rn->m_bOnList)
return;

if (s->isStillBusy())
{
HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " is still busy, NOT deleting");
return;
}

LOGC(smlog.Note, log << "@" << s->m_SocketID << " busy=" << s->isStillBusy());

#if ENABLE_BONDING
if (s->m_GroupOf)
{
Expand Down
63 changes: 59 additions & 4 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ class CUDTSocket

void construct();

private:
srt::sync::atomic<int> m_iBusy;
public:
void apiAcquire() { ++m_iBusy; }
void apiRelease() { --m_iBusy; }

int isStillBusy() const
{
return m_iBusy;
}


SRT_ATTR_GUARDED_BY(m_ControlLock)
sync::atomic<SRT_SOCKSTATUS> m_Status; //< current socket state

Expand All @@ -131,7 +143,8 @@ class CUDTSocket
/// of sockets in order to prevent other methods from accessing invalid address.
/// A timer is started and the socket will be removed after approximately
/// 1 second (see CUDTUnited::checkBrokenSockets()).
sync::steady_clock::time_point m_tsClosureTimeStamp;
//sync::steady_clock::time_point m_tsClosureTimeStamp;
sync::AtomicClock<sync::steady_clock> m_tsClosureTimeStamp;

sockaddr_any m_SelfAddr; //< local address of the socket
sockaddr_any m_PeerAddr; //< peer address of the socket
Expand Down Expand Up @@ -324,7 +337,7 @@ class CUDTUnited
int epoll_release(const int eid);

#if ENABLE_BONDING
// [[using locked(m_GlobControlLock)]]
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_GlobControlLock)
CUDTGroup& addGroup(SRTSOCKET id, SRT_GROUP_TYPE type)
{
// This only ensures that the element exists.
Expand All @@ -346,7 +359,7 @@ class CUDTUnited
void deleteGroup(CUDTGroup* g);
void deleteGroup_LOCKED(CUDTGroup* g);

// [[using locked(m_GlobControlLock)]]
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_GlobControlLock)
CUDTGroup* findPeerGroup_LOCKED(SRTSOCKET peergroup)
{
for (groups_t::iterator i = m_Groups.begin(); i != m_Groups.end(); ++i)
Expand Down Expand Up @@ -445,8 +458,50 @@ class CUDTUnited
}
}
};

#endif

CUDTSocket* locateAcquireSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN);
bool acquireSocket(CUDTSocket* s);

public:
struct SocketKeeper
{
CUDTSocket* socket;

SocketKeeper(): socket(NULL) {}

// This is intended for API functions to lock the socket's existence
// for the lifetime of their call.
SocketKeeper(CUDTUnited& glob, SRTSOCKET id, ErrorHandling erh = ERH_RETURN) { socket = glob.locateAcquireSocket(id, erh); }

// This is intended for TSBPD thread that should lock the socket's
// existence until it exits.
SocketKeeper(CUDTUnited& glob, CUDTSocket* s)
{
acquire(glob, s);
}

// Note: acquire doesn't check if the keeper already keeps anything.
// This is only for a use together with an empty constructor.
bool acquire(CUDTUnited& glob, CUDTSocket* s)
{
const bool caught = glob.acquireSocket(s);
socket = caught ? s : NULL;
return caught;
}

~SocketKeeper()
{
if (socket)
{
SRT_ASSERT(socket->isStillBusy() > 0);
socket->apiRelease();
}
}
};

private:

void updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* = NULL);
bool updateListenerMux(CUDTSocket* s, const CUDTSocket* ls);

Expand Down
25 changes: 15 additions & 10 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5844,13 +5844,15 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any&
}

#if ENABLE_BONDING
m_ConnectionLock.unlock();
// The socket and the group are only linked to each other after interpretSrtHandshake(..) has been called.
// Keep the group alive for the lifetime of this function,
// and do it BEFORE acquiring m_ConnectionLock to avoid
// lock inversion.
// This will check if a socket belongs to a group and if so
// it will remember this group and keep it alive here.
CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent);
m_ConnectionLock.lock();
#endif

if (!prepareBuffers(NULL))
Expand Down Expand Up @@ -8748,7 +8750,7 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr
if (m_config.bDriftTracer)
{
#if ENABLE_BONDING
ScopedLock glock(uglobal().m_GlobControlLock);
ScopedLock glock(uglobal().m_GlobControlLock); // XXX not too excessive?
const bool drift_updated =
#endif
m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt);
Expand Down Expand Up @@ -11726,17 +11728,20 @@ void srt::CUDT::completeBrokenConnectionDependencies(int errorcode)
// Bound to one call because this requires locking
pg->updateFailedLink();
}
// Sockets that never succeeded to connect must be deleted
// explicitly, otherwise they will never be deleted. OTOH
// the socket can be on the path of deletion already, so
// this only makes sure that the socket will be deleted,
// one way or another.
if (pending_broken)
{
// XXX This somehow can cause a deadlock
// uglobal()->close(m_parent);
LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group.");
m_parent->setBrokenClosed();
}
}

// Sockets that never succeeded to connect must be deleted
// explicitly, otherwise they will never be deleted.
if (pending_broken)
{
// XXX This somehow can cause a deadlock
// uglobal()->close(m_parent);
LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group.");
m_parent->setBrokenClosed();
}
#endif
}

Expand Down
40 changes: 38 additions & 2 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,13 @@ void* srt::CSndQueue::worker(void* param)
continue;
}

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), u->id());
if (!sk.socket)
{
HLOGC(qslog.Debug, log << "Socket to be processed was deleted in the meantime, not packing");
continue;
}

// pack a packet from the socket
CPacket pkt;
steady_clock::time_point next_send_time;
Expand Down Expand Up @@ -929,6 +936,16 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
EReadStatus read_st = rst;
EConnectStatus conn_st = cst;

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id);
if (!sk.socket)
{
// Socket deleted already, so stop this and proceed to the next loop.
LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, proceed to only removal from lists");
toRemove.push_back(*i);
continue;
}


if (cst != CONN_RENDEZVOUS && dest_id != 0)
{
if (i->id != dest_id)
Expand Down Expand Up @@ -974,14 +991,22 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
for (vector<LinkStatusInfo>::iterator i = toRemove.begin(); i != toRemove.end(); ++i)
{
HLOGC(cnlog.Debug, log << "updateConnStatus: COMPLETING dep objects update on failed @" << i->id);
//
remove(i->id);

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id);
if (!sk.socket)
{
// This actually shall never happen, so it's a kind of paranoid check.
LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, NOT ACCESSING its contents");
continue;
}

// Setting m_bConnecting to false, and need to remove the socket from the rendezvous queue
// because the next CUDT::close will not remove it from the queue when m_bConnecting = false,
// and may crash on next pass.
//
// TODO: maybe lock i->u->m_ConnectionLock?
i->u->m_bConnecting = false;
remove(i->u->m_SocketID);

// DO NOT close the socket here because in this case it might be
// unable to get status from at the right moment. Also only member
Expand All @@ -992,6 +1017,11 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
CUDT::uglobal().m_EPoll.update_events(
i->u->m_SocketID, i->u->m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true);

// Make sure that the socket wasn't deleted in the meantime.
// Skip this part if it was. Note also that if the socket was
// decided to be deleted, it's already moved to m_ClosedSockets
// and should have been therefore already processed for deletion.

i->u->completeBrokenConnectionDependencies(i->errorcode);
}

Expand Down Expand Up @@ -1442,6 +1472,12 @@ srt::EConnectStatus srt::CRcvQueue::worker_ProcessAddressedPacket(int32_t id, CU
HLOGC(cnlog.Debug, log << "worker_ProcessAddressedPacket: resending to QUEUED socket @" << id);
return worker_TryAsyncRend_OrStore(id, unit, addr);
}
// Although we don´t have an exclusive passing here,
// we can count on that when the socket was once present in the hash,
// it will not be deleted for at least one GC cycle. But we still need
// to maintain the object existence until it's in use.
// Note that here we are out of any locks, so m_GlobControlLock can be locked.
CUDTUnited::SocketKeeper sk (CUDT::uglobal(), u->m_parent);

// Found associated CUDT - process this as control or data packet
// addressed to an associated socket.
Expand Down
2 changes: 1 addition & 1 deletion test/test_fec_rebuilding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ TEST(TestFEC, ConnectionMess)

SRTSOCKET la[] = { l };
SRTSOCKET a = srt_accept_bond(la, 1, 2000);
ASSERT_NE(a, SRT_ERROR);
ASSERT_NE(a, SRT_ERROR) << srt_getlasterror_str();
EXPECT_EQ(connect_res.get(), SRT_SUCCESS);

// Now that the connection is established, check negotiated config
Expand Down
Loading