Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Epic] Core Synchronization Issues #1610

Open
6 of 8 tasks
maxsharabayko opened this issue Oct 15, 2020 · 4 comments
Open
6 of 8 tasks

[Epic] Core Synchronization Issues #1610

maxsharabayko opened this issue Oct 15, 2020 · 4 comments
Assignees
Labels
[core] Area: Changes in SRT library core Epic Priority: Critical
Milestone

Comments

@maxsharabayko
Copy link
Collaborator

maxsharabayko commented Oct 15, 2020

Revise SRT core synchronisation.

TODO

Done

Related PRs

@maxsharabayko maxsharabayko added the [core] Area: Changes in SRT library core label Oct 15, 2020
@maxsharabayko maxsharabayko added this to the v1.5.0 milestone Oct 15, 2020
@maxsharabayko maxsharabayko self-assigned this Oct 15, 2020
@maxsharabayko
Copy link
Collaborator Author

maxsharabayko commented Oct 15, 2020

Core threads synchronization overview.

Note. CRcvQueue::m_pRcvUList may still have this socket, and may be processing it during releaseSynch(). Some synchronization is likely required.

tsbpd()

A separate internal thread per receiving socket.

CUDT::tsbpd() (click to expand/collapse)

{
    UniqueLock recv_lock  (self->m_RecvLock);
    CSync recvdata_cc (self->m_RecvDataCond, recv_lock);
    CSync tsbpd_cc    (self->m_RcvTsbPdCond, recv_lock);
    
    while (!self->m_bClosing)
    {
        enterCS(self->m_RcvBufferLock);
        self->m_pRcvBuffer->getRcvFirstMsg();
        self->m_pRcvBuffer->skipData(seqlen);
        self->m_pRcvBuffer->isRcvDataReady(..);
        leaveCS(self->m_RcvBufferLock);
        
        if (self->m_bSynRecving)
            recvdata_cc.signal_locked(recv_lock);
        self->s_UDTUnited.m_EPoll.update_events(self->m_SocketID, self->m_sPollID, SRT_EPOLL_IN, true);
        CGlobEvent::triggerEvent();
        
        if (tsbpdtime)
            tsbpd_cc.wait_for(timediff);
        else
            tsbpd_cc.wait();
    }
}

receiveMessage()

CUDT::receiveMessage() is called from the srt_recvmsg(..) thread.

CUDT::receiveMessage() (click to expand/collapse)

{
    UniqueLock recvguard (m_RecvLock);
    CSync tscond     (m_RcvTsbPdCond,  recvguard);
    
    if (m_bBroken || m_bClosing)
    {
        enterCS(m_RcvBufferLock);
        const int res = m_pRcvBuffer->readMsg(data, len);
        leaveCS(m_RcvBufferLock);
        if (m_bTsbPd)
        {
            HLOGP(tslog.Debug, "Ping TSBPD thread to schedule wakeup");
            tscond.signal_locked(recvguard);
        }
    }
    
    if (!m_bSynRecving)
    {
        enterCS(m_RcvBufferLock);
        const int res = m_pRcvBuffer->readMsg(data, len, (w_mctrl), seqdistance);
        leaveCS(m_RcvBufferLock);
        
        if (res == 0)
        {
            if (m_bTsbPd)
            {
                HLOGP(arlog.Debug, "receiveMessage: nothing to read, kicking TSBPD, return AGAIN");
                tscond.signal_locked(recvguard);
            }
            s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
            return 0;
        }
        
        if (!m_pRcvBuffer->isRcvDataReady())
        {
            // Kick TsbPd thread to schedule next wakeup (if running)
            if (m_bTsbPd)
            {
                HLOGP(arlog.Debug, "receiveMessage: DATA READ, but nothing more - kicking TSBPD.");
                tscond.signal_locked(recvguard);
            }
        }
        return res;
    }
    
    CSync recv_cond (m_RecvDataCond, recvguard);
    do
    {
        if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady(..))
        {
            /* Kick TsbPd thread to schedule next wakeup (if running) */
            if (m_bTsbPd)
                tscond.signal_locked(recvguard);

            do
            {
                if (!recv_cond.wait_until(exptime)) // Unblocks m_RecvLock
                {
                    if (m_iRcvTimeOut >= 0) // otherwise it's "no timeout set"
                        timeout = true;
                }
            } while (stillConnected() && !timeout && (!m_pRcvBuffer->isRcvDataReady()));
            
            
        }
        
        enterCS(m_RcvBufferLock);
        res = m_pRcvBuffer->readMsg((data), len, (w_mctrl), seqdistance);
        leaveCS(m_RcvBufferLock);

    } while ((res == 0) && !timeout);
            
    if (!m_pRcvBuffer->isRcvDataReady())
    {
        // Kick TsbPd thread to schedule next wakeup (if running)
        if (m_bTsbPd)
            tscond.signal_locked(recvguard);

        s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
    }
    
    return res;
}

processData(..)

CUDT::processData(CUnit* ) is called from the internal receiving thread.

CUDT::processData(..) (click to expand/collapse)

{
    const bool need_tsbpd = m_bTsbPd || m_bGroupTsbPd;

    // We are receiving data, start tsbpd thread if TsbPd is enabled
    if (need_tsbpd && !m_RcvTsbPdThread.joinable())
    {
        StartThread(m_RcvTsbPdThread, CUDT::tsbpd, this, thname);
    }
    
    {
        UniqueLock recvbuf_acklock(m_RcvBufferLock);
        m_pRcvBuffer->addData(*i, offset);
    }

    // Wake up TSBPD on loss to reschedule possible TL drop
    if (!srt_loss_seqs.empty() && m_bTsbPd)
        CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);

    if (!filter_loss_seqs.empty() && m_bTsbPd)
        CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);
}

releaseSynch()

CUDT::releaseSynch() is called on:

  • UMSG_SHUTDOWN
  • CUDT::checkExpTimer
  • in CUDT::processData (SEQUENCE DISCREPANCY. BREAKING CONNECTION)
  • in srt_close() or sending thread in non-blocking mode
  • in Garbage Collector thread (checkBrokenSockets(..) or makeClosed())
  • in srt_connect
CUDT::releaseSynch() (click to expand/collapse)

{
    // wake up user calls
    CSync::lock_signal(m_SendBlockCond, m_SendBlockLock);

    enterCS(m_SendLock);
    leaveCS(m_SendLock);

    m_ReadyToReadEvent.lock_notify_one();
    CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);

    enterCS(m_RecvDataLock);
    if (m_RcvTsbPdThread.joinable())
    {
        m_RcvTsbPdThread.join();
    }
    leaveCS(m_RecvDataLock);

    enterCS(m_RecvLock);
    leaveCS(m_RecvLock);
}

@maxsharabayko
Copy link
Collaborator Author

maxsharabayko commented Nov 12, 2020

TODO

  • Make access to CRcvQueue thread safe. -- Now atomic.
Data race aroung CRcvQueue::m_bClosing
  Write of size 1 at 0x7b4c0000005c by thread T1 (mutexes: write M315, write M312):
    #0 CRcvQueue::setClosing() queue.h:481 (test-srt:x86_64+0x1001de909)
       [NO LOCKS on modifying m_bClosing, but also calls releaseSynch that locks/unlocks/signals]
    #1 CUDTUnited::removeSocket(int) api.cpp:2750 (test-srt:x86_64+0x1001de3e6)
       [NO LOCKS, scoped lock of m_AcceptLock afterwards]
    #2 CUDTUnited::checkBrokenSockets() api.cpp:2646 (test-srt:x86_64+0x1001dcc4e)
       [LOCKS CUDTUnited::m_GlobControlLock]
    #3 CUDTUnited::garbageCollect(void*) api.cpp:3040 (test-srt:x86_64+0x1001c0032)
       [LOCKS CUDTUnited::m_GCStopLock]

  Previous read of size 1 at 0x7b4c0000005c by thread T4:
    #0 CRcvQueue::worker(void*) queue.cpp:1189 (test-srt:x86_64+0x1003b0e8b)
       [NO LOCKS on reading of m_bClosing]

@maxsharabayko
Copy link
Collaborator Author

maxsharabayko commented Jun 22, 2022

Created a separate issue #2970.
Simultaneous access to listener's config (CUDT::m_config) from SRT internal newConnection(..) and the app thread called for srt_listen(). Although it does not seem the right thing to do, still the data race has to be avoided.

WARNING: ThreadSanitizer: data race (pid=2590)
  Read of size 8 at 0x7ba8000000a8 by thread T3 (mutexes: write M227):
    #0 memcpy ../../../../src/libsanitizer/sanitizer_common/sanitizer_common_interceptors.inc:827 (libtsan.so.0+0x6243e)
    #1 memcpy ../../../../src/libsanitizer/sanitizer_common/sanitizer_common_interceptors.inc:819 (libtsan.so.0+0x6243e)
    #2 srt::CSrtConfig::operator=(srt::CSrtConfig const&) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/socketconfig.h:167 (srt-xtransmit+0x2b6d98)
    #3 srt::CUDT::CUDT(srt::CUDTSocket*, srt::CUDT const&) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/core.cpp:335 (srt-xtransmit+0x2796e6)
    #4 srt::CUDTSocket::CUDTSocket(srt::CUDTSocket const&) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/api.h:113 (srt-xtransmit+0x24de9e)
    #5 srt::CUDTUnited::newConnection(int, srt::sockaddr_any const&, srt::CPacket const&, srt::CHandShake&, int&, srt::CUDT*&) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/api.cpp:544 (srt-xtransmit+0x233312)
    #6 srt::CUDT::processConnectRequest(srt::sockaddr_any const&, srt::CPacket&) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/core.cpp:11024 (srt-xtransmit+0x2b2175)
    #7 srt::CRcvQueue::worker_ProcessConnectionRequest(srt::CUnit*, srt::sockaddr_any const&) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/queue.cpp:1406 (srt-xtransmit+0x309743)
    #8 srt::CRcvQueue::worker(void*) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/queue.cpp:1238 (srt-xtransmit+0x30828c)

  Previous write of size 1 at 0x7ba8000000a9 by main thread (mutexes: write M188, write M193, write M192):
    #0 (anonymous namespace)::CSrtConfigSetter<(SRT_SOCKOPT)2>::set(srt::CSrtConfig&, void const*, int) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/socketconfig.cpp:213 (srt-xtransmit+0x31c8c3)
    #1 (anonymous namespace)::dispatchSet(SRT_SOCKOPT, srt::CSrtConfig&, void const*, int) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/socketconfig.cpp:903 (srt-xtransmit+0x31acf0)
    #2 srt::CSrtConfig::set(SRT_SOCKOPT, void const*, int) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/socketconfig.cpp:954 (srt-xtransmit+0x31a7fd)
    #3 srt::CUDT::setOpt(SRT_SOCKOPT, void const*, int) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/core.cpp:404 (srt-xtransmit+0x27bbe4)
    #4 srt::CUDT::setsockopt(int, int, SRT_SOCKOPT, void const*, int) /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/api.cpp:3641 (srt-xtransmit+0x243f71)
    #5 srt_setsockopt /mnt/d/Projects/srt/srt-xtransmit-multi/submodule/srt/srtcore/srt_c_api.cpp:165 (srt-xtransmit+0x3227c9)
    #6 xtransmit::socket::srt::configure_post(int) /mnt/d/Projects/srt/srt-xtransmit-multi/xtransmit/srt_socket.cpp:324 (srt-xtransmit+0x10b67a)
    #7 xtransmit::socket::srt::listen() /mnt/d/Projects/srt/srt-xtransmit-multi/xtransmit/srt_socket.cpp:104 (srt-xtransmit+0x10968a)
    #8 xtransmit::create_connection(std::vector<UriParser, std::allocator<UriParser> > const&, std::shared_ptr<xtransmit::socket::isocket>&) /mnt/d/Projects/srt/srt-xtransmit-multi/xtransmit/misc.cpp:70 (srt-xtransmit+0xd330f)
    #9 xtransmit::common_run(std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, xtransmit::stats_config const&, xtransmit::conn_config const&, std::atomic<bool> const&, std::function<void (std::shared_ptr<xtransmit::socket::isocket>, std::atomic<bool> const&)>&) /mnt/d/Projects/srt/srt-xtransmit-multi/xtransmit/misc.cpp:145 (srt-xtransmit+0xd37f6)
    #10 xtransmit::receive::run(std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, xtransmit::receive::config const&, std::atomic<bool> const&) /mnt/d/Projects/srt/srt-xtransmit-multi/xtransmit/receive.cpp:141 (srt-xtransmit+0xe190f)
    #11 main /mnt/d/Projects/srt/srt-xtransmit-multi/xtransmit/xtransmit-app.cpp:231 (srt-xtransmit+0x127092)
void socket::srt::listen()
{
	int         num_clients = 2;
	int res = srt_listen(m_bind_socket, num_clients);
	if (res == SRT_ERROR)
	{
		srt_close(m_bind_socket);
		raise_exception("listen");
	}

	spdlog::debug(LOG_SOCK_SRT "0x{:X} (srt://{}:{:d}) Listening", m_bind_socket, m_host, m_port);
	res = configure_post(m_bind_socket);
	if (res == SRT_ERROR)
		raise_exception("listen::configure_post");
}

@maxsharabayko
Copy link
Collaborator Author

Destroying the CUDTSocket::m_ControlLock while it is being held by another thread.
Can't lock it in CUDTSocket destructor, because it may cause lock-order-inversion with CUDTUnited::m_GlobControlLock

WARNING: ThreadSanitizer: data race (pid=2590)
  Write of size 1 at 0x7ba800011600 by thread T1 (mutexes: write M169, write M166):
    #0 pthread_mutex_destroy ../../../../src/libsanitizer/tsan/tsan_interceptors_posix.cpp:1244 (libtsan.so.0+0x39398)
    #1 m_ControlLock::~Mutex() srt-xtransmit/submodule/srt/srtcore/sync_posix.cpp:215 (srt-xtransmit+0x32ae05) m_ControlLock
    #2 srt::CUDTSocket::~CUDTSocket() srt-xtransmit/submodule/srt/srtcore/api.cpp:100 (srt-xtransmit+0x2302cf)
    #3 srt::CUDTUnited::removeSocket(int) srt-xtransmit/submodule/srt/srtcore/api.cpp:2780 (srt-xtransmit+0x23fa6f)
    #4 srt::CUDTUnited::checkBrokenSockets() srt-xtransmit/submodule/srt/srtcore/api.cpp:2696 (srt-xtransmit+0x23f1fa)
    #5 srt::CUDTUnited::garbageCollect(void*) srt-xtransmit/submodule/srt/srtcore/api.cpp:3115 (srt-xtransmit+0x231764)

  Previous atomic read of size 1 at 0x7ba800011600 by thread T4 (mutexes: write M72756505476142592):
    #0 pthread_mutex_unlock ../../../../src/libsanitizer/sanitizer_common/sanitizer_common_interceptors.inc:4254 (libtsan.so.0+0x3bff8)
    #1 m_ControlLock::unlock() srt-xtransmit/submodule/srt/srtcore/sync_posix.cpp:225 (srt-xtransmit+0x32ae85)
    #2 srt::sync::ScopedLock::~ScopedLock() srt-xtransmit/submodule/srt/srtcore/sync_posix.cpp:241 (srt-xtransmit+0x32af95)
    #3 srt::CUDTUnited::close(srt::CUDTSocket*) srt-xtransmit/submodule/srt/srtcore/api.cpp:2107 (srt-xtransmit+0x23bada)
    #4 srt::CUDTUnited::close(int) srt-xtransmit/submodule/srt/srtcore/api.cpp:1894 (srt-xtransmit+0x23b3c0)
    #5 srt::CUDT::close(int) srt-xtransmit/submodule/srt/srtcore/api.cpp:3542 (srt-xtransmit+0x242cf2)
    #6 srt_close srt-xtransmit/submodule/srt/srtcore/srt_c_api.cpp:157 (srt-xtransmit+0x322571)
    #7 xtransmit::socket::srt::~srt() srt-xtransmit/xtransmit/srt_socket.cpp:90 (srt-xtransmit+0x1094b9)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
[core] Area: Changes in SRT library core Epic Priority: Critical
Projects
None yet
Development

No branches or pull requests

1 participant