-
Notifications
You must be signed in to change notification settings - Fork 851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Epic] Core Synchronization Issues #1610
Comments
Core threads synchronization overview. Note. tsbpd()A separate internal thread per receiving socket. CUDT::tsbpd() (click to expand/collapse)
{
UniqueLock recv_lock (self->m_RecvLock);
CSync recvdata_cc (self->m_RecvDataCond, recv_lock);
CSync tsbpd_cc (self->m_RcvTsbPdCond, recv_lock);
while (!self->m_bClosing)
{
enterCS(self->m_RcvBufferLock);
self->m_pRcvBuffer->getRcvFirstMsg();
self->m_pRcvBuffer->skipData(seqlen);
self->m_pRcvBuffer->isRcvDataReady(..);
leaveCS(self->m_RcvBufferLock);
if (self->m_bSynRecving)
recvdata_cc.signal_locked(recv_lock);
self->s_UDTUnited.m_EPoll.update_events(self->m_SocketID, self->m_sPollID, SRT_EPOLL_IN, true);
CGlobEvent::triggerEvent();
if (tsbpdtime)
tsbpd_cc.wait_for(timediff);
else
tsbpd_cc.wait();
}
} receiveMessage()
CUDT::receiveMessage() (click to expand/collapse)
{
UniqueLock recvguard (m_RecvLock);
CSync tscond (m_RcvTsbPdCond, recvguard);
if (m_bBroken || m_bClosing)
{
enterCS(m_RcvBufferLock);
const int res = m_pRcvBuffer->readMsg(data, len);
leaveCS(m_RcvBufferLock);
if (m_bTsbPd)
{
HLOGP(tslog.Debug, "Ping TSBPD thread to schedule wakeup");
tscond.signal_locked(recvguard);
}
}
if (!m_bSynRecving)
{
enterCS(m_RcvBufferLock);
const int res = m_pRcvBuffer->readMsg(data, len, (w_mctrl), seqdistance);
leaveCS(m_RcvBufferLock);
if (res == 0)
{
if (m_bTsbPd)
{
HLOGP(arlog.Debug, "receiveMessage: nothing to read, kicking TSBPD, return AGAIN");
tscond.signal_locked(recvguard);
}
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
return 0;
}
if (!m_pRcvBuffer->isRcvDataReady())
{
// Kick TsbPd thread to schedule next wakeup (if running)
if (m_bTsbPd)
{
HLOGP(arlog.Debug, "receiveMessage: DATA READ, but nothing more - kicking TSBPD.");
tscond.signal_locked(recvguard);
}
}
return res;
}
CSync recv_cond (m_RecvDataCond, recvguard);
do
{
if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady(..))
{
/* Kick TsbPd thread to schedule next wakeup (if running) */
if (m_bTsbPd)
tscond.signal_locked(recvguard);
do
{
if (!recv_cond.wait_until(exptime)) // Unblocks m_RecvLock
{
if (m_iRcvTimeOut >= 0) // otherwise it's "no timeout set"
timeout = true;
}
} while (stillConnected() && !timeout && (!m_pRcvBuffer->isRcvDataReady()));
}
enterCS(m_RcvBufferLock);
res = m_pRcvBuffer->readMsg((data), len, (w_mctrl), seqdistance);
leaveCS(m_RcvBufferLock);
} while ((res == 0) && !timeout);
if (!m_pRcvBuffer->isRcvDataReady())
{
// Kick TsbPd thread to schedule next wakeup (if running)
if (m_bTsbPd)
tscond.signal_locked(recvguard);
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
}
return res;
} processData(..)
CUDT::processData(..) (click to expand/collapse)
{
const bool need_tsbpd = m_bTsbPd || m_bGroupTsbPd;
// We are receiving data, start tsbpd thread if TsbPd is enabled
if (need_tsbpd && !m_RcvTsbPdThread.joinable())
{
StartThread(m_RcvTsbPdThread, CUDT::tsbpd, this, thname);
}
{
UniqueLock recvbuf_acklock(m_RcvBufferLock);
m_pRcvBuffer->addData(*i, offset);
}
// Wake up TSBPD on loss to reschedule possible TL drop
if (!srt_loss_seqs.empty() && m_bTsbPd)
CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);
if (!filter_loss_seqs.empty() && m_bTsbPd)
CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);
} releaseSynch()
CUDT::releaseSynch() (click to expand/collapse)
{
// wake up user calls
CSync::lock_signal(m_SendBlockCond, m_SendBlockLock);
enterCS(m_SendLock);
leaveCS(m_SendLock);
m_ReadyToReadEvent.lock_notify_one();
CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);
enterCS(m_RecvDataLock);
if (m_RcvTsbPdThread.joinable())
{
m_RcvTsbPdThread.join();
}
leaveCS(m_RecvDataLock);
enterCS(m_RecvLock);
leaveCS(m_RecvLock);
} |
TODO
Data race aroung CRcvQueue::m_bClosing
Write of size 1 at 0x7b4c0000005c by thread T1 (mutexes: write M315, write M312):
#0 CRcvQueue::setClosing() queue.h:481 (test-srt:x86_64+0x1001de909)
[NO LOCKS on modifying m_bClosing, but also calls releaseSynch that locks/unlocks/signals]
#1 CUDTUnited::removeSocket(int) api.cpp:2750 (test-srt:x86_64+0x1001de3e6)
[NO LOCKS, scoped lock of m_AcceptLock afterwards]
#2 CUDTUnited::checkBrokenSockets() api.cpp:2646 (test-srt:x86_64+0x1001dcc4e)
[LOCKS CUDTUnited::m_GlobControlLock]
#3 CUDTUnited::garbageCollect(void*) api.cpp:3040 (test-srt:x86_64+0x1001c0032)
[LOCKS CUDTUnited::m_GCStopLock]
Previous read of size 1 at 0x7b4c0000005c by thread T4:
#0 CRcvQueue::worker(void*) queue.cpp:1189 (test-srt:x86_64+0x1003b0e8b)
[NO LOCKS on reading of m_bClosing] |
Created a separate issue #2970. WARNING: ThreadSanitizer: data race (pid=2590)
Read of size 8 at 0x7ba8000000a8 by thread T3 (mutexes: write M227):
#0 memcpy ../../../../src/libsanitizer/sanitizer_common/sanitizer_common_interceptors.inc:827 (libtsan.so.0+0x6243e)
#1 memcpy ../../../../src/libsanitizer/sanitizer_common/sanitizer_common_interceptors.inc:819 (libtsan.so.0+0x6243e)
#2 srt::CSrtConfig::operator=(srt::CSrtConfig const&) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/socketconfig.h:167 (srt-xtransmit+0x2b6d98)
#3 srt::CUDT::CUDT(srt::CUDTSocket*, srt::CUDT const&) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/core.cpp:335 (srt-xtransmit+0x2796e6)
#4 srt::CUDTSocket::CUDTSocket(srt::CUDTSocket const&) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/api.h:113 (srt-xtransmit+0x24de9e)
#5 srt::CUDTUnited::newConnection(int, srt::sockaddr_any const&, srt::CPacket const&, srt::CHandShake&, int&, srt::CUDT*&) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/api.cpp:544 (srt-xtransmit+0x233312)
#6 srt::CUDT::processConnectRequest(srt::sockaddr_any const&, srt::CPacket&) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/core.cpp:11024 (srt-xtransmit+0x2b2175)
#7 srt::CRcvQueue::worker_ProcessConnectionRequest(srt::CUnit*, srt::sockaddr_any const&) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/queue.cpp:1406 (srt-xtransmit+0x309743)
#8 srt::CRcvQueue::worker(void*) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/queue.cpp:1238 (srt-xtransmit+0x30828c)
Previous write of size 1 at 0x7ba8000000a9 by main thread (mutexes: write M188, write M193, write M192):
#0 (anonymous namespace)::CSrtConfigSetter<(SRT_SOCKOPT)2>::set(srt::CSrtConfig&, void const*, int) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/socketconfig.cpp:213 (srt-xtransmit+0x31c8c3)
#1 (anonymous namespace)::dispatchSet(SRT_SOCKOPT, srt::CSrtConfig&, void const*, int) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/socketconfig.cpp:903 (srt-xtransmit+0x31acf0)
#2 srt::CSrtConfig::set(SRT_SOCKOPT, void const*, int) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/socketconfig.cpp:954 (srt-xtransmit+0x31a7fd)
#3 srt::CUDT::setOpt(SRT_SOCKOPT, void const*, int) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/core.cpp:404 (srt-xtransmit+0x27bbe4)
#4 srt::CUDT::setsockopt(int, int, SRT_SOCKOPT, void const*, int) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/api.cpp:3641 (srt-xtransmit+0x243f71)
#5 srt_setsockopt /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/srt_c_api.cpp:165 (srt-xtransmit+0x3227c9)
#6 xtransmit::socket::srt::configure_post(int) /mnt/d/Projects/srt/srt-xtransmit-multi/xtransmit/srt_socket.cpp:324 (srt-xtransmit+0x10b67a)
#7 xtransmit::socket::srt::listen() /mnt/d/Projects/srt/srt-xtransmit-multi/xtransmit/srt_socket.cpp:104 (srt-xtransmit+0x10968a)
#8 xtransmit::create_connection(std::vector<UriParser, std::allocator<UriParser> > const&, std::shared_ptr<xtransmit::socket::isocket>&) /mnt/d/Projects/srt/srt-xtransmit-multi/xtransmit/misc.cpp:70 (srt-xtransmit+0xd330f)
#9 xtransmit::common_run(std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, xtransmit::stats_config const&, xtransmit::conn_config const&, std::atomic<bool> const&, std::function<void (std::shared_ptr<xtransmit::socket::isocket>, std::atomic<bool> const&)>&) /mnt/d/Projects/srt/srt-xtransmit-multi/xtransmit/misc.cpp:145 (srt-xtransmit+0xd37f6)
#10 xtransmit::receive::run(std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, xtransmit::receive::config const&, std::atomic<bool> const&) /mnt/d/Projects/srt/srt-xtransmit-multi/xtransmit/receive.cpp:141 (srt-xtransmit+0xe190f)
#11 main /mnt/d/Projects/srt/srt-xtransmit-multi/xtransmit/xtransmit-app.cpp:231 (srt-xtransmit+0x127092) void socket::srt::listen()
{
int num_clients = 2;
int res = srt_listen(m_bind_socket, num_clients);
if (res == SRT_ERROR)
{
srt_close(m_bind_socket);
raise_exception("listen");
}
spdlog::debug(LOG_SOCK_SRT "0x{:X} (srt://{}:{:d}) Listening", m_bind_socket, m_host, m_port);
res = configure_post(m_bind_socket);
if (res == SRT_ERROR)
raise_exception("listen::configure_post");
} |
Destroying the WARNING: ThreadSanitizer: data race (pid=2590)
Write of size 1 at 0x7ba800011600 by thread T1 (mutexes: write M169, write M166):
#0 pthread_mutex_destroy ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1244 (libtsan.so.0+0x39398)
#1 m_ControlLock::~Mutex() srt-xtransmit/submodule/srt/srtcore/sync_posix.cpp:215 (srt-xtransmit+0x32ae05) m_ControlLock
#2 srt::CUDTSocket::~CUDTSocket() srt-xtransmit/submodule/srt/srtcore/api.cpp:100 (srt-xtransmit+0x2302cf)
#3 srt::CUDTUnited::removeSocket(int) srt-xtransmit/submodule/srt/srtcore/api.cpp:2780 (srt-xtransmit+0x23fa6f)
#4 srt::CUDTUnited::checkBrokenSockets() srt-xtransmit/submodule/srt/srtcore/api.cpp:2696 (srt-xtransmit+0x23f1fa)
#5 srt::CUDTUnited::garbageCollect(void*) srt-xtransmit/submodule/srt/srtcore/api.cpp:3115 (srt-xtransmit+0x231764)
Previous atomic read of size 1 at 0x7ba800011600 by thread T4 (mutexes: write M72756505476142592):
#0 pthread_mutex_unlock ../../../../src/libsanitizer/sanitizer_common/sanitizer_common_interceptors.inc:4254 (libtsan.so.0+0x3bff8)
#1 m_ControlLock::unlock() srt-xtransmit/submodule/srt/srtcore/sync_posix.cpp:225 (srt-xtransmit+0x32ae85)
#2 srt::sync::ScopedLock::~ScopedLock() srt-xtransmit/submodule/srt/srtcore/sync_posix.cpp:241 (srt-xtransmit+0x32af95)
#3 srt::CUDTUnited::close(srt::CUDTSocket*) srt-xtransmit/submodule/srt/srtcore/api.cpp:2107 (srt-xtransmit+0x23bada)
#4 srt::CUDTUnited::close(int) srt-xtransmit/submodule/srt/srtcore/api.cpp:1894 (srt-xtransmit+0x23b3c0)
#5 srt::CUDT::close(int) srt-xtransmit/submodule/srt/srtcore/api.cpp:3542 (srt-xtransmit+0x242cf2)
#6 srt_close srt-xtransmit/submodule/srt/srtcore/srt_c_api.cpp:157 (srt-xtransmit+0x322571)
#7 xtransmit::socket::srt::~srt() srt-xtransmit/xtransmit/srt_socket.cpp:90 (srt-xtransmit+0x1094b9) |
Revise SRT core synchronisation.
TODO
Done
Related PRs
CRendezvousQueue
and remove a socket from the queue it is not connecting (m_bConnecting = false
).The text was updated successfully, but these errors were encountered: