Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduced the CSharedResource #2293

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ void srt::CUDT::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen)
// Restriction check
const int oflags = s_sockopt_action.flags[optName];

ScopedLock cg (m_ConnectionLock);
CScopedResourceLock cg (m_ConnectionResources);
ScopedLock sendguard (m_SendLock);
ScopedLock recvguard (m_RecvLock);

Expand Down Expand Up @@ -436,7 +436,7 @@ void srt::CUDT::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen)

void srt::CUDT::getOpt(SRT_SOCKOPT optName, void *optval, int &optlen)
{
ScopedLock cg(m_ConnectionLock);
CScopedResourceLock cg(m_ConnectionResources);

switch (optName)
{
Expand Down Expand Up @@ -912,7 +912,7 @@ void srt::CUDT::clearData()

void srt::CUDT::open()
{
ScopedLock cg(m_ConnectionLock);
CScopedResourceLock cg(m_ConnectionResources);

clearData();

Expand Down Expand Up @@ -965,7 +965,7 @@ void srt::CUDT::open()

void srt::CUDT::setListenState()
{
ScopedLock cg(m_ConnectionLock);
CScopedResourceLock cg(m_ConnectionResources);

if (!m_bOpened)
throw CUDTException(MJ_NOTSUP, MN_NONE, 0);
Expand Down Expand Up @@ -1963,6 +1963,7 @@ bool srt::CUDT::processSrtMsg(const CPacket *ctrlpkt)
// and the appropriate message must be constructed for sending.
// No further processing required
{
CScopedResourceLock cg(m_ConnectionResources);
uint32_t srtdata_out[SRTDATA_MAXSIZE];
size_t len_out = 0;
res = m_pCryptoControl->processSrtMsg_KMREQ(srtdata, len, CUDT::HS_VERSION_UDT4,
Expand Down Expand Up @@ -3346,7 +3347,7 @@ void srt::CUDT::synchronizeWithGroup(CUDTGroup* gp)

void srt::CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn)
{
ScopedLock cg (m_ConnectionLock);
CScopedResourceLock cg (m_ConnectionResources);

HLOGC(aclog.Debug, log << CONID() << "startConnect: -> " << serv_addr.str()
<< (m_config.bSynRecving ? " (SYNCHRONOUS)" : " (ASYNCHRONOUS)") << "...");
Expand Down Expand Up @@ -3481,7 +3482,7 @@ void srt::CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn)
/*
* Race condition if non-block connect response thread scheduled before we set m_bConnecting to true?
* Connect response will be ignored and connecting will wait until timeout.
* Maybe m_ConnectionLock handling problem? Not used in CUDT::connect(const CPacket& response)
* Maybe m_ConnectionResources handling problem? Not used in CUDT::connect(const CPacket& response)
*/
m_tsLastReqTime = now;
m_bConnecting = true;
Expand Down Expand Up @@ -3728,7 +3729,7 @@ EConnectStatus srt::CUDT::processAsyncConnectResponse(const CPacket &pkt) ATR_NO
EConnectStatus cst = CONN_CONTINUE;
CUDTException e;

ScopedLock cg(m_ConnectionLock);
CScopedResourceLock cg(m_ConnectionResources);
HLOGC(cnlog.Debug, log << CONID() << "processAsyncConnectResponse: got response for connect request, processing");
cst = processConnectResponse(pkt, &e);

Expand Down Expand Up @@ -3766,7 +3767,7 @@ bool srt::CUDT::processAsyncConnectRequest(EReadStatus rst,

bool status = true;

ScopedLock cg(m_ConnectionLock);
CScopedResourceLock cg(m_ConnectionResources);
if (!m_bOpened) // Check the socket has not been closed before already.
return false;

Expand Down Expand Up @@ -4242,10 +4243,10 @@ EConnectStatus srt::CUDT::processRendezvous(
return CONN_CONTINUE;
}

// [[using locked(m_ConnectionLock)]];
// [[using locked(m_ConnectionResources)]];
EConnectStatus srt::CUDT::processConnectResponse(const CPacket& response, CUDTException* eout) ATR_NOEXCEPT
{
// NOTE: ASSUMED LOCK ON: m_ConnectionLock.
// NOTE: ASSUMED LOCK ON: m_ConnectionResources.

// this is the 2nd half of a connection request. If the connection is setup successfully this returns 0.
// Returned values:
Expand Down Expand Up @@ -4615,7 +4616,7 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous,
// And, I am connected too.
m_bConnecting = false;

// The lock on m_ConnectionLock should still be applied, but
// The lock on m_ConnectionResources should still be applied, but
// the socket could have been started removal before this function
// has started. Do a sanity check before you continue with the
// connection process.
Expand Down Expand Up @@ -5654,7 +5655,7 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any&
{
HLOGC(cnlog.Debug, log << "acceptAndRespond: setting up data according to handshake");

ScopedLock cg(m_ConnectionLock);
CScopedResourceLock cg(m_ConnectionResources);

m_tsRcvPeerStartTime = steady_clock::time_point(); // will be set correctly at SRT HS

Expand Down Expand Up @@ -6145,7 +6146,7 @@ bool srt::CUDT::closeInternal()

HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE. Acquiring connection lock");

ScopedLock connectguard(m_ConnectionLock);
CScopedResourceLock connectguard(m_ConnectionResources);

// Signal the sender and recver if they are waiting for data.
releaseSynch();
Expand Down Expand Up @@ -7425,7 +7426,8 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)

perf->mbpsBandwidth = Bps2Mbps(availbw * (m_iMaxSRTPayloadSize + pktHdrSize));

if (tryEnterCS(m_ConnectionLock))
//if (m_ConnectionResources.tryAcquire())
if (CScopedResourceTryLock cg = m_ConnectionResources)
{
if (m_pSndBuffer)
{
Expand Down Expand Up @@ -7471,7 +7473,7 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)
perf->msRcvBuf = 0;
}

leaveCS(m_ConnectionLock);
//m_ConnectionResources.release();
}
else
{
Expand Down Expand Up @@ -7613,9 +7615,9 @@ void srt::CUDT::initSynch()
setupMutex(m_RcvLossLock, "RcvLoss");
setupMutex(m_RecvAckLock, "RecvAck");
setupMutex(m_RcvBufferLock, "RcvBuffer");
setupMutex(m_ConnectionLock, "Connection");
setupMutex(m_StatsLock, "Stats");
setupCond(m_RcvTsbPdCond, "RcvTsbPd");
m_ConnectionResources.init();
}

void srt::CUDT::destroySynch()
Expand All @@ -7636,7 +7638,7 @@ void srt::CUDT::destroySynch()
releaseMutex(m_RcvLossLock);
releaseMutex(m_RecvAckLock);
releaseMutex(m_RcvBufferLock);
releaseMutex(m_ConnectionLock);
m_ConnectionResources.destroy();
releaseMutex(m_StatsLock);

m_RcvTsbPdCond.notify_all();
Expand Down Expand Up @@ -9347,7 +9349,7 @@ std::pair<bool, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)

string reason = "reXmit";

ScopedLock connectguard(m_ConnectionLock);
CScopedResourceLock connectguard(m_ConnectionResources);
// If a closing action is done simultaneously, then
// m_bOpened should already be false, and it's set
// just before releasing this lock.
Expand Down
22 changes: 11 additions & 11 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ class CUDT

void addressAndSend(CPacket& pkt);

SRT_ATTR_REQUIRES(m_ConnectionLock)
SRT_ATTR_REQUIRES(m_ConnectionResources)
void sendSrtMsg(int cmd, uint32_t *srtdata_in = NULL, size_t srtlen_in = 0);

bool isOPT_TsbPd() const { return m_config.bTSBPD; }
Expand Down Expand Up @@ -401,7 +401,7 @@ class CUDT
// immediately to free the socket
void notListening()
{
sync::ScopedLock cg(m_ConnectionLock);
sync::CScopedResourceLock cg(m_ConnectionResources);
m_bListening = false;
m_pRcvQueue->removeListener(this);
}
Expand Down Expand Up @@ -466,7 +466,7 @@ class CUDT
/// @retval 1 Connection in progress (m_ConnReq turned into RESPONSE)
/// @retval -1 Connection failed

SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources)
EConnectStatus processConnectResponse(const CPacket& pkt, CUDTException* eout) ATR_NOEXCEPT;

// This function works in case of HSv5 rendezvous. It changes the state
Expand All @@ -486,14 +486,14 @@ class CUDT
/// @param response incoming handshake response packet to be interpreted
/// @param serv_addr incoming packet's address
/// @param rst Current read status to know if the HS packet was freshly received from the peer, or this is only a periodic update (RST_AGAIN)
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources)
EConnectStatus processRendezvous(const CPacket* response, const sockaddr_any& serv_addr, EReadStatus, CPacket& reqpkt);

/// Create the CryptoControl object based on the HS packet. Allocates sender and receiver buffers and loss lists.
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources)
bool prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd, CUDTException *eout);

SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources)
EConnectStatus postConnect(const CPacket* response, bool rendezvous, CUDTException* eout) ATR_NOEXCEPT;

SRT_ATR_NODISCARD bool applyResponseSettings() ATR_NOEXCEPT;
Expand All @@ -507,15 +507,15 @@ class CUDT
SRT_ATR_NODISCARD size_t fillSrtHandshake_HSRSP(uint32_t* srtdata, size_t srtlen, int hs_version);
SRT_ATR_NODISCARD size_t fillSrtHandshake(uint32_t* srtdata, size_t srtlen, int msgtype, int hs_version);

SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources)
bool createSrtHandshake(int srths_cmd, int srtkm_cmd, const uint32_t* data, size_t datalen,
CPacket& w_reqpkt, CHandShake& w_hs);

SRT_ATR_NODISCARD size_t fillHsExtConfigString(uint32_t *pcmdspec, int cmd, const std::string &str);
#if ENABLE_EXPERIMENTAL_BONDING
SRT_ATR_NODISCARD size_t fillHsExtGroup(uint32_t *pcmdspec);
#endif
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources)
size_t fillHsExtKMREQ(uint32_t *pcmdspec, size_t ki);

SRT_ATR_NODISCARD size_t fillHsExtKMRSP(uint32_t *pcmdspec, const uint32_t *kmdata, size_t kmdata_wordsize);
Expand Down Expand Up @@ -763,7 +763,7 @@ class CUDT
int m_iTsbPdDelay_ms; // Rx delay to absorb burst, in milliseconds
int m_iPeerTsbPdDelay_ms; // Tx delay that the peer uses to absorb burst, in milliseconds
bool m_bTLPktDrop; // Enable Too-late Packet Drop
SRT_ATTR_PT_GUARDED_BY(m_ConnectionLock)
SRT_ATTR_PT_GUARDED_BY(m_ConnectionResources)
UniquePtr<CCryptoControl> m_pCryptoControl; // Crypto control module
CCache<CInfoBlock>* m_pCache; // Network information cache

Expand Down Expand Up @@ -965,7 +965,7 @@ class CUDT


private: // synchronization: mutexes and conditions
sync::Mutex m_ConnectionLock; // used to synchronize connection operation
sync::CSharedResource m_ConnectionResources; ///< Protects CUDT resources from simultaneous access, including creating and destroying dynamic objects.

sync::Condition m_SendBlockCond; // used to block "send" call
sync::Mutex m_SendBlockLock; // lock associated to m_SendBlockCond
Expand Down Expand Up @@ -998,7 +998,7 @@ class CUDT

// Failure to create the crypter means that an encrypted
// connection should be rejected if ENFORCEDENCRYPTION is on.
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionResources)
bool createCrypter(HandshakeSide side, bool bidi);

private: // Generation and processing of packets
Expand Down
2 changes: 1 addition & 1 deletion srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ void CUDTGroup::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen)
std::vector<CUDTSocket*> ps_vec;
{
// Do copy to avoid deadlock. CUDT::setOpt() cannot be called directly inside this loop, because
// CUDT::setOpt() will lock m_ConnectionLock, which should be locked before m_GroupLock.
// CUDT::setOpt() will lock m_ConnectionResources, which should be locked before m_GroupLock.
ScopedLock gg(m_GroupLock);
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
Expand Down
2 changes: 1 addition & 1 deletion srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
// because the next CUDT::close will not remove it from the queue when m_bConnecting = false,
// and may crash on next pass.
//
// TODO: maybe lock i->u->m_ConnectionLock?
// TODO: maybe lock i->u->m_ConnectionResources?
i->u->m_bConnecting = false;
remove(i->u->m_SocketID);

Expand Down
50 changes: 50 additions & 0 deletions srtcore/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,3 +353,53 @@ int srt::sync::genRandomInt(int minVal, int maxVal)
#endif // HAVE_CXX11
}

////////////////////////////////////////////////////////////////////////////////
//
// CSharedResource class
//
////////////////////////////////////////////////////////////////////////////////

void srt::sync::CSharedResource::release()
{
ScopedLock lock(m_lock);
SRT_ASSERT(m_iResourceTaken > 0);
--m_iResourceTaken;
if (m_iResourceTaken == 0)
m_cond.notify_one();
}

void srt::sync::CSharedResource::acquire()
{
UniqueLock lock(m_lock);

// Allow reacquiring the resource from the same thread
if (m_iResourceTaken > 0 && this_thread::get_id() == m_thid)
{
++m_iResourceTaken;
return;
}

while (m_iResourceTaken > 0)
{
m_cond.wait(lock);
}

SRT_ASSERT(m_iResourceTaken == 0);
++m_iResourceTaken;
m_thid = this_thread::get_id();
}

bool srt::sync::CSharedResource::tryAcquire()
{
ScopedLock lock(m_lock);

// Immediately withdraw from trying to acquire the taken resource from a different thread.
if (m_iResourceTaken > 0 && this_thread::get_id() != m_thid)
{
return false;
}

++m_iResourceTaken;
m_thid = this_thread::get_id();
return true;
}
Loading