Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINT] Undefined (TBD) #1559

Draft
wants to merge 31 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
eadf7ab
Added basic unit test for bonding groups
Sep 18, 2020
f7e9c77
Added 2 tests and a description
Sep 18, 2020
910cf70
Fixed: one uwait call may report both IN and UPDATE
Sep 18, 2020
0645bb7
BackupPrioritySelection: allow any link to be activated first
Sep 18, 2020
b53a64b
Added more trace for the failing Travis test
Sep 18, 2020
a7a6316
Some fixes.
Oct 21, 2020
f3e9411
Merged, fixed and refactored
Dec 4, 2020
1cd8bcd
Merge branch 'master' into dev-test-group-ut
Dec 4, 2020
5834866
Added fixes for proper report blocking. Testing all group types in Co…
Dec 7, 2020
4a95481
[core] Fixed proper reporting of sending blocked state. Fixed API val…
Dec 7, 2020
c272a05
Fixed return 0 on multi-connect call. Added lock-preventive emergency…
Dec 8, 2020
834b702
Fixed TSBPD deadlock-prone continuation. Demoted epoll IPE log. Made …
Dec 8, 2020
e1503db
Fixed test cases (FileUpload fail detected)
Dec 8, 2020
f48362b
TEMPORARY CHECKIN OF UNKNOWN PURPOSE (likely merge issues)
Dec 9, 2020
4320e27
Updated and resolved
Dec 9, 2020
c3a7a3c
Updated and post-fixed. NOTE: Broadcast group still failing.
Sep 11, 2024
1e26e79
Updated
Dec 3, 2024
4a85e10
Fixed bug: send when pending in backup groups reports correct error
Dec 5, 2024
e88e794
[core] Fixed: attempt to send to a group in connection-pending state …
Dec 11, 2024
9239ba1
[TEST] Added more tests for bonding
Dec 11, 2024
ac510a6
Fixed test error report principles to avoid ASSERT and keep socket un…
Dec 12, 2024
d477e6f
Avoid using enum for flag comparison in gtest
Dec 12, 2024
956c161
Changed conflicting variable name
Dec 12, 2024
38114f9
Attempted fix for clang mac gtest
Dec 12, 2024
d139372
Other fixes for clang gtest
Dec 12, 2024
06ffc6a
Other fixes for clang gtest (new)
Dec 12, 2024
0a66943
Other fixes for clang gtest (new 2)
Dec 12, 2024
2192e52
Merge branch 'master' into dev-test-add-new-bonding
Dec 17, 2024
ccaf789
Updated
Dec 17, 2024
eb2a503
Updated from 1709 to clear out merged changes
Dec 18, 2024
6c761c3
Merge branch 'master' into dev-test-group-ut
Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1673,6 +1673,26 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i

vector<SRTSOCKET> broken;

// Return value rules:
// In non-blocking mode:
// - return Socket ID, if:
// - you requested only one connection in this call
// In blocking mode:
// - return Socket ID, if:
// - you requested only one connection in this call
// - you connect a group that was not connected yet
// - otherwise return 0

// Leave the last SID value in retval if you had only one
// connection to start. Otherwise override it with 0.
if (arraysize > 1)
retval = 0;

// For blocking mode only, and only in case when the group
// was not yet connected, this retval could be overridden
// again with the first ready socket ID, and this socket ID
// will be returned.

while (block_new_opened)
{
if (spawned.empty())
Expand Down
61 changes: 50 additions & 11 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4951,6 +4951,11 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous,
s->m_Status = SRTS_CONNECTED;

// acknowledde any waiting epolls to write
// This must be done AFTER the group member status is upgraded to IDLE because
// this state change will trigger the waiting function in blocking-mode groupConnect
// and this may be immediately followed by exit from connect and start sending function,
// which must see this very link already as IDLE, not PENDING, which will make this
// link unable to be used and therefore the sending call would fail.
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_CONNECT, true);

CGlobEvent::triggerEvent();
Expand Down Expand Up @@ -5541,6 +5546,12 @@ void * srt::CUDT::tsbpd(void* param)
gkeeper.group->updateLatestRcv(self->m_parent);
}
}

// After re-acquisition of the m_RecvLock, re-check the closing flag
if (self->m_bClosing)
{
break;
}
#endif
CGlobEvent::triggerEvent();
tsNextDelivery = steady_clock::time_point(); // Ready to read, nothing to wait for.
Expand All @@ -5550,7 +5561,7 @@ void * srt::CUDT::tsbpd(void* param)
if (self->m_bClosing)
break;

SRT_ATR_UNUSED bool bWokeUpOnSignal = true;
bool bWokeUpOnSignal = true;

if (!is_zero(tsNextDelivery))
{
Expand All @@ -5566,6 +5577,7 @@ void * srt::CUDT::tsbpd(void* param)
THREAD_PAUSED();
bWokeUpOnSignal = tsbpd_cc.wait_until(tsNextDelivery);
THREAD_RESUMED();
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP on " << (bWokeUpOnSignal? "SIGNAL" : "TIMEOUIT") << "!!!");
}
else
{
Expand All @@ -5582,15 +5594,36 @@ void * srt::CUDT::tsbpd(void* param)
*/
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack");
self->m_bTsbPdNeedsWakeup = true;
THREAD_PAUSED();
tsbpd_cc.wait();
THREAD_RESUMED();
}

HLOGC(tslog.Debug,
log << self->CONID() << "tsbpd: WAKE UP [" << (bWokeUpOnSignal ? "signal" : "timeout") << "]!!! - "
bWokeUpOnSignal = false;
while (!bWokeUpOnSignal)
{
// For safety reasons, do wakeup once per 1/8s and re-check the flag.
// This should be enough long time that during a normal transmission
// the TSBPD thread would be woken up much earlier when required by
// ACK per ACK timer (at most 10ms since the last check) and in case
// when this might result in a deadlock, it would only hold up to 125ms,
// which should be little harmful for the application. NOTE THAT THIS
// IS A SANITY CHECK FOR A SITUATION THAT SHALL NEVER HAPPEN.
THREAD_PAUSED();
bWokeUpOnSignal = tsbpd_cc.wait_for(milliseconds_from(125));
THREAD_RESUMED();
if (self->m_bClosing && !bWokeUpOnSignal)
{
HLOGC(tslog.Debug, log << "tsbpd: IPE: Closing flag set in the meantime of waiting. Continue to EXIT");

// This break doesn't have to be done in case when signaled
// because if so this current loop will be interrupted anyway,
// and the outer loop will be terminated at the check of self->m_bClosing.
// This is only a sanity check.
break;
}
}
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP [" << (bWokeUpOnSignal ? "signal" : "timeout") << "]!!! - "
<< "NOW=" << FormatTime(steady_clock::now()));
}
}

THREAD_EXIT();
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING");
return NULL;
Expand Down Expand Up @@ -6328,7 +6361,7 @@ bool srt::CUDT::closeInternal() ATR_NOEXCEPT
// Inform the threads handler to stop.
m_bClosing = true;

HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE. Acquiring connection lock");
HLOGC(smlog.Debug, log << CONID() << "CLOSING STATE (closing=true). Acquiring connection lock");

ScopedLock connectguard(m_ConnectionLock);

Expand Down Expand Up @@ -7824,15 +7857,20 @@ void srt::CUDT::destroySynch()
void srt::CUDT::releaseSynch()
{
SRT_ASSERT(m_bClosing);
if (!m_bClosing)
{
LOGC(smlog.Error, log << "releaseSynch: IPE: m_bClosing not set to false, TSBPD might hangup!");
m_bClosing = true;
}
// wake up user calls
CSync::lock_notify_one(m_SendBlockCond, m_SendBlockLock);

enterCS(m_SendLock);
leaveCS(m_SendLock);

// Awake tsbpd() and srt_recv*(..) threads for them to check m_bClosing.
CSync::lock_notify_one(m_RecvDataCond, m_RecvLock);
CSync::lock_notify_one(m_RcvTsbPdCond, m_RecvLock);
CSync::lock_notify_all(m_RecvDataCond, m_RecvLock);
CSync::lock_notify_all(m_RcvTsbPdCond, m_RecvLock);

// Azquiring m_RcvTsbPdStartupLock protects race in starting
// the tsbpd() thread in CUDT::processData().
Expand Down Expand Up @@ -9931,7 +9969,7 @@ void srt::CUDT::processClose()
m_bBroken = true;
m_iBrokenCounter = 60;

HLOGP(smlog.Debug, "processClose: sent message and set flags");
HLOGP(smlog.Debug, "processClose: (closing=true) sent message and set flags");

if (m_bTsbPd)
{
Expand Down Expand Up @@ -11683,6 +11721,7 @@ void srt::CUDT::checkTimers()

void srt::CUDT::updateBrokenConnection()
{
HLOGC(smlog.Debug, log << "updateBrokenConnection: setting closing=true and taking out epoll events");
m_bClosing = true;
releaseSynch();
// app can call any UDT API to learn the connection_broken error
Expand Down
2 changes: 1 addition & 1 deletion srtcore/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ int srt::CEPoll::update_events(const SRTSOCKET& uid, std::set<int>& eids, const
if (!pwait)
{
// As this is mapped in the socket's data, it should be impossible.
LOGC(eilog.Error, log << "epoll/update: IPE: update struck E"
HLOGC(eilog.Debug, log << "epoll/update: IPE: update struck E"
<< (*i) << " which is NOT SUBSCRIBED to @" << uid);
continue;
}
Expand Down
28 changes: 15 additions & 13 deletions test/test_connection_timeout.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,20 +170,22 @@ TEST_F(TestConnectionTimeout, Nonblocking) {
*/
TEST_F(TestConnectionTimeout, BlockingLoop)
{
const SRTSOCKET client_sock = srt_create_socket();
ASSERT_GT(client_sock, 0); // socket_id should be > 0

// Set connection timeout to 999 ms to reduce the test execution time.
// Also need to hit a time point between two threads:
// srt_connect will check TTL every second,
// CRcvQueue::worker will wait on a socket for 10 ms.
// Need to have a condition, when srt_connect will process the timeout.
const int connection_timeout_ms = 999;
EXPECT_EQ(srt_setsockopt(client_sock, 0, SRTO_CONNTIMEO, &connection_timeout_ms, sizeof connection_timeout_ms), SRT_SUCCESS);

const sockaddr* psa = reinterpret_cast<const sockaddr*>(&m_sa);
const int connection_timeout_ms = 999;
for (int i = 0; i < 10; ++i)
{
const SRTSOCKET client_sock = srt_create_socket();
EXPECT_GT(client_sock, 0); // socket_id should be > 0
if (client_sock <= 0)
break;

// Set connection timeout to 999 ms to reduce the test execution time.
// Also need to hit a time point between two threads:
// srt_connect will check TTL every second,
// CRcvQueue::worker will wait on a socket for 10 ms.
// Need to have a condition, when srt_connect will process the timeout.
EXPECT_EQ(srt_setsockopt(client_sock, 0, SRTO_CONNTIMEO, &connection_timeout_ms, sizeof connection_timeout_ms), SRT_SUCCESS);

const chrono::steady_clock::time_point chrono_ts_start = chrono::steady_clock::now();
EXPECT_EQ(srt_connect(client_sock, psa, sizeof m_sa), SRT_ERROR);

Expand All @@ -200,9 +202,9 @@ TEST_F(TestConnectionTimeout, BlockingLoop)
<< error_code << " " << srt_getlasterror_str() << "\n";
break;
}
}

EXPECT_EQ(srt_close(client_sock), SRT_SUCCESS);
EXPECT_EQ(srt_close(client_sock), SRT_SUCCESS);
}
}


Expand Down
Loading