Skip to content

Commit

Permalink
Updated to latest master
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikołaj Małecki committed Jul 18, 2024
2 parents f56dba5 + 84d18ec commit 82bfbd3
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 49 deletions.
48 changes: 27 additions & 21 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ srt::CUDTUnited::CUDTUnited()
, m_GlobControlLock()
, m_IDLock()
, m_mMultiplexer()
, m_MultiplexerLock()
, m_pCache(NULL)
, m_pCache(new CCache<CInfoBlock>)
, m_bClosing(false)
, m_GCStopCond()
, m_InitLock()
Expand All @@ -195,8 +194,6 @@ srt::CUDTUnited::CUDTUnited()
setupMutex(m_GlobControlLock, "GlobControl");
setupMutex(m_IDLock, "ID");
setupMutex(m_InitLock, "Init");

m_pCache = new CCache<CInfoBlock>;
}

srt::CUDTUnited::~CUDTUnited()
Expand Down Expand Up @@ -766,7 +763,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
enterCS(ls->m_AcceptLock);
try
{
ls->m_QueuedSockets.insert(ns->m_SocketID);
ls->m_QueuedSockets[ns->m_SocketID] = ns->m_PeerAddr;
}
catch (...)
{
Expand Down Expand Up @@ -1110,8 +1107,22 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int
}
else if (ls->m_QueuedSockets.size() > 0)
{
set<SRTSOCKET>::iterator b = ls->m_QueuedSockets.begin();
u = *b;
map<SRTSOCKET, sockaddr_any>::iterator b = ls->m_QueuedSockets.begin();

if (pw_addr != NULL && pw_addrlen != NULL)
{
// Check if the length of the buffer to fill the name in
// was large enough.
const int len = b->second.size();
if (*pw_addrlen < len)
{
// In case when the address cannot be rewritten,
// DO NOT accept, but leave the socket in the queue.
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
}
}

u = b->first;
ls->m_QueuedSockets.erase(b);
accepted = true;
}
Expand Down Expand Up @@ -1182,14 +1193,8 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int

if (pw_addr != NULL && pw_addrlen != NULL)
{
// Check if the length of the buffer to fill the name in
// was large enough.
const int len = s->m_PeerAddr.size();
if (*pw_addrlen < len)
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);

memcpy((pw_addr), &s->m_PeerAddr, len);
*pw_addrlen = len;
memcpy((pw_addr), s->m_PeerAddr.get(), s->m_PeerAddr.size());
*pw_addrlen = s->m_PeerAddr.size();
}

return u;
Expand Down Expand Up @@ -2823,23 +2828,24 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)

// if it is a listener, close all un-accepted sockets in its queue
// and remove them later
for (set<SRTSOCKET>::iterator q = s->m_QueuedSockets.begin(); q != s->m_QueuedSockets.end(); ++q)
for (map<SRTSOCKET, sockaddr_any>::iterator q = s->m_QueuedSockets.begin();
q != s->m_QueuedSockets.end(); ++ q)
{
sockets_t::iterator si = m_Sockets.find(*q);
sockets_t::iterator si = m_Sockets.find(q->first);
if (si == m_Sockets.end())
{
// gone in the meantime
LOGC(smlog.Error,
log << "removeSocket: IPE? socket @" << (*q) << " being queued for listener socket @"
<< s->m_SocketID << " is GONE in the meantime ???");
log << "removeSocket: IPE? socket @" << (q->first) << " being queued for listener socket @"
<< s->m_SocketID << " is GONE in the meantime ???");
continue;
}

CUDTSocket* as = si->second;

as->breakSocket_LOCKED();
m_ClosedSockets[*q] = as;
m_Sockets.erase(*q);
m_ClosedSockets[q->first] = as;
m_Sockets.erase(q->first);
}
}

Expand Down
22 changes: 16 additions & 6 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class CUDTSocket
CUDT m_UDT; //< internal SRT socket logic

public:
std::set<SRTSOCKET> m_QueuedSockets; //< set of connections waiting for accept()
std::map<SRTSOCKET, sockaddr_any> m_QueuedSockets; //< set of connections waiting for accept()

sync::Condition m_AcceptCond; //< used to block "accept" call
sync::Mutex m_AcceptLock; //< mutex associated to m_AcceptCond
Expand Down Expand Up @@ -398,11 +398,13 @@ class CUDTUnited

private:
typedef std::map<SRTSOCKET, CUDTSocket*> sockets_t; // stores all the socket structures
sockets_t m_Sockets;
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
sockets_t m_Sockets;

#if ENABLE_BONDING
typedef std::map<SRTSOCKET, CUDTGroup*> groups_t;
groups_t m_Groups;
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
groups_t m_Groups;
#endif

sync::Mutex m_GlobControlLock; // used to synchronize UDT API
Expand All @@ -412,6 +414,7 @@ class CUDTUnited
SRTSOCKET m_SocketIDGenerator; // seed to generate a new unique socket ID
SRTSOCKET m_SocketIDGenerator_init; // Keeps track of the very first one

SRT_ATTR_GUARDED_BY(m_GlobControlLock)
std::map<int64_t, std::set<SRTSOCKET> >
m_PeerRec; // record sockets from peers to avoid repeated connection request, int64_t = (socker_id << 30) + isn

Expand Down Expand Up @@ -515,26 +518,33 @@ class CUDTUnited
const sockaddr_any& reqaddr, const CSrtMuxerConfig& cfgSocket);

private:
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
std::map<int, CMultiplexer> m_mMultiplexer; // UDP multiplexer
sync::Mutex m_MultiplexerLock;

private:
CCache<CInfoBlock>* m_pCache; // UDT network information cache
/// UDT network information cache.
/// Existence is guarded by m_GlobControlLock, but the cache itself is thread-safe.
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
CCache<CInfoBlock>* const m_pCache;

private:
srt::sync::atomic<bool> m_bClosing;
sync::Mutex m_GCStopLock;
sync::Condition m_GCStopCond;

sync::Mutex m_InitLock;
SRT_ATTR_GUARDED_BY(m_InitLock)
int m_iInstanceCount; // number of startup() called by application
SRT_ATTR_GUARDED_BY(m_InitLock)
bool m_bGCStatus; // if the GC thread is working (true)

SRT_ATTR_GUARDED_BY(m_InitLock)
sync::CThread m_GCThread;
static void* garbageCollect(void*);

SRT_ATTR_GUARDED_BY(m_GlobControlLock)
sockets_t m_ClosedSockets; // temporarily store closed sockets
#if ENABLE_BONDING
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
groups_t m_ClosedGroups;
#endif

Expand Down
10 changes: 6 additions & 4 deletions srtcore/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,20 @@ template<typename T> class CCache
return 0;
}

/// Specify the cache size (i.e., max number of items).
/// @param [in] size max cache size.
private:

/// Specify the cache size (i.e., max number of items).
/// Private or else must be protected by a lock.
/// @param [in] size max cache size.
void setSizeLimit(int size)
{
m_iMaxSize = size;
m_iHashSize = size * 3;
m_vHashPtr.resize(m_iHashSize);
}

/// Clear all entries in the cache, restore to initialization state.

/// Clear all entries in the cache, restore to initialization state.
/// Private or else must be protected by a lock.
void clear()
{
for (typename std::list<T*>::iterator i = m_StorageList.begin(); i != m_StorageList.end(); ++ i)
Expand Down
7 changes: 3 additions & 4 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,7 @@ void srt::CUDT::clearData()

m_stats.tsLastSampleTime = steady_clock::now();
m_stats.traceReorderDistance = 0;
m_stats.traceBelatedTime = 0;
m_stats.sndDuration = m_stats.m_sndDurationTotal = 0;
}

Expand Down Expand Up @@ -8043,7 +8044,6 @@ bool srt::CUDT::getFirstNoncontSequence(int32_t& w_seq, string& w_log_reason)

int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
{
SRT_ASSERT(ctrlpkt.getMsgTimeStamp() != 0);
int nbsent = 0;
int local_prevack = 0;
#if ENABLE_HEAVY_LOGGING
Expand Down Expand Up @@ -9995,14 +9995,13 @@ bool srt::CUDT::overrideSndSeqNo(int32_t seq)
int srt::CUDT::checkLazySpawnTsbPdThread()
{
const bool need_tsbpd = m_bTsbPd || m_bGroupTsbPd;

if (!need_tsbpd)
return 0;

ScopedLock lock(m_RcvTsbPdStartupLock);
if (!m_RcvTsbPdThread.joinable())
{
if (m_bClosing) // Check again to protect join() in CUDT::releaseSync()
if (m_bClosing) // Check m_bClosing to protect join() in CUDT::releaseSync().
return -1;

HLOGP(qrlog.Debug, "Spawning Socket TSBPD thread");
Expand Down Expand Up @@ -11953,7 +11952,7 @@ void srt::CUDT::processKeepalive(const CPacket& ctrlpkt, const time_point& tsArr
if (m_parent->m_GroupOf)
{
// Lock GlobControlLock in order to make sure that
// the state if the socket having the group and the
// the state of the socket having the group and the
// existence of the group will not be changed during
// the operation. The attempt of group deletion will
// have to wait until this operation completes.
Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ class CUDT
sync::CThread m_RcvTsbPdThread; // Rcv TsbPD Thread handle
sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock
bool m_bTsbPdNeedsWakeup; // Signal TsbPd thread to wake up on RCV buffer state change.
sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creating and joining
sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creation and joining.

CallbackHolder<srt_listen_callback_fn> m_cbAcceptHook;
CallbackHolder<srt_connect_callback_fn> m_cbConnectHook;
Expand Down
22 changes: 19 additions & 3 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,10 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)

int ercode = 0;

if (was_blocked)
// This block causes waiting for any socket to accept the payload.
// This should be done only in blocking mode and only if no other socket
// accepted the payload.
if (was_blocked && none_succeeded && m_bSynSending)
{
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
if (!m_bSynSending)
Expand Down Expand Up @@ -1648,6 +1651,19 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
throw CUDTException(major, minor, 0);
}

for (vector<Sendstate>::iterator is = sendstates.begin(); is != sendstates.end(); ++is)
{
// Here we have a situation that at least 1 link successfully sent a packet.
// All links for which sending has failed must be closed.
if (is->stat == -1)
{
// This only sets the state to the socket; the GC process should
// pick it up at the next time.
HLOGC(gslog.Debug, log << "grp/sendBroadcast: per PARTIAL SUCCESS, closing failed @" << is->id);
is->mb->ps->setBrokenClosed();
}
}

// Now that at least one link has succeeded, update sending stats.
m_stats.sent.count(len);

Expand Down Expand Up @@ -2574,7 +2590,7 @@ class StabilityTracer
str_tnow.replace(str_tnow.find(':'), 1, 1, '_');
}
const std::string fname = "stability_trace_" + str_tnow + ".csv";
m_fout.open(fname, std::ofstream::out);
m_fout.open(fname.c_str(), std::ofstream::out);
if (!m_fout)
std::cerr << "IPE: Failed to open " << fname << "!!!\n";

Expand Down Expand Up @@ -3183,7 +3199,7 @@ void CUDTGroup::send_CloseBrokenSockets(vector<SRTSOCKET>& w_wipeme)
InvertedLock ug(m_GroupLock);

// With unlocked GroupLock, we can now lock GlobControlLock.
// This is needed prevent any of them be deleted from the container
// This is needed to prevent any of them deleted from the container
// at the same time.
ScopedLock globlock(CUDT::uglobal().m_GlobControlLock);

Expand Down
10 changes: 0 additions & 10 deletions srtcore/socketconfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,15 +559,6 @@ struct CSrtConfigSetter<SRTO_LOSSMAXTTL>
}
};

template<>
struct CSrtConfigSetter<SRTO_VERSION>
{
static void set(CSrtConfig& co, const void* optval, int optlen)
{
co.uSrtVersion = cast_optval<uint32_t>(optval, optlen);
}
};

template<>
struct CSrtConfigSetter<SRTO_MINVERSION>
{
Expand Down Expand Up @@ -971,7 +962,6 @@ int dispatchSet(SRT_SOCKOPT optName, CSrtConfig& co, const void* optval, int opt
DISPATCH(SRTO_CONNTIMEO);
DISPATCH(SRTO_DRIFTTRACER);
DISPATCH(SRTO_LOSSMAXTTL);
DISPATCH(SRTO_VERSION);
DISPATCH(SRTO_MINVERSION);
DISPATCH(SRTO_STREAMID);
DISPATCH(SRTO_CONGESTION);
Expand Down
Loading

0 comments on commit 82bfbd3

Please sign in to comment.