Skip to content

Commit

Permalink
Merge branch 'master' into dev-add-custom-fmt-for-logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikołaj Małecki committed Jul 22, 2024
2 parents d8bda01 + 84d18ec commit d0f472f
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 32 deletions.
5 changes: 1 addition & 4 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ srt::CUDTUnited::CUDTUnited()
, m_GlobControlLock()
, m_IDLock()
, m_mMultiplexer()
, m_MultiplexerLock()
, m_pCache(NULL)
, m_pCache(new CCache<CInfoBlock>)
, m_bClosing(false)
, m_GCStopCond()
, m_InitLock()
Expand All @@ -195,8 +194,6 @@ srt::CUDTUnited::CUDTUnited()
setupMutex(m_GlobControlLock, "GlobControl");
setupMutex(m_IDLock, "ID");
setupMutex(m_InitLock, "Init");

m_pCache = new CCache<CInfoBlock>;
}

srt::CUDTUnited::~CUDTUnited()
Expand Down
20 changes: 15 additions & 5 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,13 @@ class CUDTUnited

private:
typedef std::map<SRTSOCKET, CUDTSocket*> sockets_t; // stores all the socket structures
sockets_t m_Sockets;
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
sockets_t m_Sockets;

#if ENABLE_BONDING
typedef std::map<SRTSOCKET, CUDTGroup*> groups_t;
groups_t m_Groups;
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
groups_t m_Groups;
#endif

sync::Mutex m_GlobControlLock; // used to synchronize UDT API
Expand All @@ -399,6 +401,7 @@ class CUDTUnited
SRTSOCKET m_SocketIDGenerator; // seed to generate a new unique socket ID
SRTSOCKET m_SocketIDGenerator_init; // Keeps track of the very first one

SRT_ATTR_GUARDED_BY(m_GlobControlLock)
std::map<int64_t, std::set<SRTSOCKET> >
m_PeerRec; // record sockets from peers to avoid repeated connection request, int64_t = (socker_id << 30) + isn

Expand Down Expand Up @@ -460,26 +463,33 @@ class CUDTUnited
const sockaddr_any& reqaddr, const CSrtMuxerConfig& cfgSocket);

private:
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
std::map<int, CMultiplexer> m_mMultiplexer; // UDP multiplexer
sync::Mutex m_MultiplexerLock;

private:
CCache<CInfoBlock>* m_pCache; // UDT network information cache
/// UDT network information cache.
/// Existence is guarded by m_GlobControlLock, but the cache itself is thread-safe.
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
CCache<CInfoBlock>* const m_pCache;

private:
srt::sync::atomic<bool> m_bClosing;
sync::Mutex m_GCStopLock;
sync::Condition m_GCStopCond;

sync::Mutex m_InitLock;
SRT_ATTR_GUARDED_BY(m_InitLock)
int m_iInstanceCount; // number of startup() called by application
SRT_ATTR_GUARDED_BY(m_InitLock)
bool m_bGCStatus; // if the GC thread is working (true)

SRT_ATTR_GUARDED_BY(m_InitLock)
sync::CThread m_GCThread;
static void* garbageCollect(void*);

SRT_ATTR_GUARDED_BY(m_GlobControlLock)
sockets_t m_ClosedSockets; // temporarily store closed sockets
#if ENABLE_BONDING
SRT_ATTR_GUARDED_BY(m_GlobControlLock)
groups_t m_ClosedGroups;
#endif

Expand Down
10 changes: 6 additions & 4 deletions srtcore/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,20 @@ template<typename T> class CCache
return 0;
}

/// Specify the cache size (i.e., max number of items).
/// @param [in] size max cache size.
private:

/// Specify the cache size (i.e., max number of items).
/// Private or else must be protected by a lock.
/// @param [in] size max cache size.
void setSizeLimit(int size)
{
m_iMaxSize = size;
m_iHashSize = size * 3;
m_vHashPtr.resize(m_iHashSize);
}

/// Clear all entries in the cache, restore to initialization state.

/// Clear all entries in the cache, restore to initialization state.
/// Private or else must be protected by a lock.
void clear()
{
for (typename std::list<T*>::iterator i = m_StorageList.begin(); i != m_StorageList.end(); ++ i)
Expand Down
13 changes: 7 additions & 6 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,7 @@ void srt::CUDT::clearData()

m_stats.tsLastSampleTime = steady_clock::now();
m_stats.traceReorderDistance = 0;
m_stats.traceBelatedTime = 0;
m_stats.sndDuration = m_stats.m_sndDurationTotal = 0;
}

Expand Down Expand Up @@ -8057,7 +8058,6 @@ bool srt::CUDT::getFirstNoncontSequence(int32_t& w_seq, string& w_log_reason)

int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
{
SRT_ASSERT(ctrlpkt.getMsgTimeStamp() != 0);
int nbsent = 0;
int local_prevack = 0;
#if ENABLE_HEAVY_LOGGING
Expand Down Expand Up @@ -10006,12 +10006,13 @@ bool srt::CUDT::overrideSndSeqNo(int32_t seq)
int srt::CUDT::checkLazySpawnTsbPdThread()
{
const bool need_tsbpd = m_bTsbPd || m_bGroupTsbPd;
if (!need_tsbpd)
return 0;

if (need_tsbpd && !m_RcvTsbPdThread.joinable())
ScopedLock lock(m_RcvTsbPdStartupLock);
if (!m_RcvTsbPdThread.joinable())
{
ScopedLock lock(m_RcvTsbPdStartupLock);

if (m_bClosing) // Check again to protect join() in CUDT::releaseSync()
if (m_bClosing) // Check m_bClosing to protect join() in CUDT::releaseSync().
return -1;

HLOGP(qrlog.Debug, "Spawning Socket TSBPD thread");
Expand Down Expand Up @@ -11958,7 +11959,7 @@ void srt::CUDT::processKeepalive(const CPacket& ctrlpkt, const time_point& tsArr
if (m_parent->m_GroupOf)
{
// Lock GlobControlLock in order to make sure that
// the state if the socket having the group and the
// the state of the socket having the group and the
// existence of the group will not be changed during
// the operation. The attempt of group deletion will
// have to wait until this operation completes.
Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ class CUDT
sync::CThread m_RcvTsbPdThread; // Rcv TsbPD Thread handle
sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock
bool m_bTsbPdNeedsWakeup; // Signal TsbPd thread to wake up on RCV buffer state change.
sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creating and joining
sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creation and joining.

CallbackHolder<srt_listen_callback_fn> m_cbAcceptHook;
CallbackHolder<srt_connect_callback_fn> m_cbConnectHook;
Expand Down
20 changes: 18 additions & 2 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,10 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)

int ercode = 0;

if (was_blocked)
// This block causes waiting for any socket to accept the payload.
// This should be done only in blocking mode and only if no other socket
// accepted the payload.
if (was_blocked && none_succeeded && m_bSynSending)
{
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
if (!m_bSynSending)
Expand Down Expand Up @@ -1650,6 +1653,19 @@ int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
throw CUDTException(major, minor, 0);
}

for (vector<Sendstate>::iterator is = sendstates.begin(); is != sendstates.end(); ++is)
{
// Here we have a situation that at least 1 link successfully sent a packet.
// All links for which sending has failed must be closed.
if (is->stat == -1)
{
// This only sets the state to the socket; the GC process should
// pick it up at the next time.
HLOGC(gslog.Debug, log << "grp/sendBroadcast: per PARTIAL SUCCESS, closing failed @" << is->id);
is->mb->ps->setBrokenClosed();
}
}

// Now that at least one link has succeeded, update sending stats.
m_stats.sent.count(len);

Expand Down Expand Up @@ -3185,7 +3201,7 @@ void CUDTGroup::send_CloseBrokenSockets(vector<SRTSOCKET>& w_wipeme)
InvertedLock ug(m_GroupLock);

// With unlocked GroupLock, we can now lock GlobControlLock.
// This is needed prevent any of them be deleted from the container
// This is needed to prevent any of them deleted from the container
// at the same time.
ScopedLock globlock(CUDT::uglobal().m_GlobControlLock);

Expand Down
10 changes: 0 additions & 10 deletions srtcore/socketconfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,15 +559,6 @@ struct CSrtConfigSetter<SRTO_LOSSMAXTTL>
}
};

template<>
struct CSrtConfigSetter<SRTO_VERSION>
{
static void set(CSrtConfig& co, const void* optval, int optlen)
{
co.uSrtVersion = cast_optval<uint32_t>(optval, optlen);
}
};

template<>
struct CSrtConfigSetter<SRTO_MINVERSION>
{
Expand Down Expand Up @@ -972,7 +963,6 @@ int dispatchSet(SRT_SOCKOPT optName, CSrtConfig& co, const void* optval, int opt
DISPATCH(SRTO_CONNTIMEO);
DISPATCH(SRTO_DRIFTTRACER);
DISPATCH(SRTO_LOSSMAXTTL);
DISPATCH(SRTO_VERSION);
DISPATCH(SRTO_MINVERSION);
DISPATCH(SRTO_STREAMID);
DISPATCH(SRTO_CONGESTION);
Expand Down

0 comments on commit d0f472f

Please sign in to comment.