Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logic refax: new rules for the receiver buffer and TSBPD #2527

Open
wants to merge 48 commits into
base: master
Choose a base branch
from

Conversation

ethouris
Copy link
Collaborator

@ethouris ethouris commented Nov 10, 2022

This changes largely the logics of the buffer and TSBPD. This change was necessary for supporting the group common buffer reception, but it should as well improve the TSBPD performance and efficiency.

The original receiver buffer in the cooperation with TSBPD was still running on the original UDT's idea of passing the packets at the ACK time, and until the ACK was done, they were not available, even if their retrieval would be theoretically possible. This design changes the TSBPD to work directly on the buffer, therefore there have been consolidated less events to trigger the TSBPD wakeup and recheck. There are actually the following situations:

  1. TSBPD sleeps forever because the buffer is empty: wake it up when a new packet has been inserted into the buffer (marked by the m_bTsbPdNeedsWakeup field).
  2. TSBPD sleeps forever because it has declared packet at [0] cell as ready to play and expects that the API call retrieve it: the API call should wake it up after the packet has been retrieved (this is the old way and remains the same here).
  3. TSBPD sleeps up to the time of delivery of a packet at [0] cell. This sleep is not interrupted in any situation.
  4. TSBPD sleeps up to the time of delivery of a jump-over packet. In this situation, only an incoming packet that has an earlier delivery time than the one currently earliest should make TSBPD woken up.

Note that in the case of forever sleeping, there's no way to distinguish the reason of "still ready to read" and "nothing to wait up to" situations - might be that this can be improved, so - for example - if there is at least one packet "in the past" (not necessarily at cell [0]) then TSBPD should NOT be woken up in the event of incoming packet, regardless of its position.

Anyway, the change for triggering TSBPD is that:

  • it has been removed from ACK handler. In the file mode ACK is still in use for signing off packets as read-ready in the currently available number in the stream mode and when a full retrievable message can be identified in the message mode, and in TSBPD nothing happens at all
  • it has been removed from the case of loss detection and dropping on demand from the peer. Packets that have been received will be delivered anyway, so dropping doesn't change anything in the packet readiness. Note that this does change things in the message mode (a scrapped message that becomes in the past towards the drop request sequence does change the earliest ready-to-deliver message), but the message mode also doesn't use TSBPD
  • there has been added triggering of TSBPD after a packet has been added to the buffer, and under specific conditions: if there has been an "earlier time" delivered, or when the m_bTsbPdNeedsWakeup flag is set.

Beside the taking out of the TSBPD triggering in the ACK handler, there's also one more change added: the ACK number to be sent back to the sender is extracted exclusively from the receiver buffer, and no longer from the loss list and internal fields. This is because the receiver buffer was added a functionality to be able to check this thing quickly. This is also a more reliable source of information and not prone to any race conditions.

Changes in the receiver buffer:

Introduced are terms of "initial contiguous region" (ICR) and "first gap". The first term existed in the old receiver buffer and it's the series of valid packets starting with the first cell up to the "first gap", that is, the series starting from at least one empty cell followed by a valid packet. None of them exist in an empty buffer. The "first gap" also may not exist if all packets from the first cell up to the one pointed by m_iMaxPosOff are valid. If the first gap starts with the first cell, this is also an "initial gap".

Two new fields were added, which are defined as offset towards m_iStartPos: m_iEndOff and m_iDropOff:

  • m_iEndOff: marks the past-the-end of the initial contiguous region. If this region is empty, m_iEndOff is 0. This field always points to an empty cell.
  • m_iDropOff: marks the position of the first valid packet following the first gap. This position is always earlier than the one pointed by m_iMaxPosOff, but later than m_iEndOff. If no such packet exists, this field has value 0. Note that there couldn't be a drop at the 0 position, at worst it can be m_iEndOff equal to 0 (which means that ICR is empty) and then m_iDropOff equal to 1, if the cell at [1] contains a valid packet.

These positions are being updated in case of various events that may influence their positions, so for example:

  1. When you want to extract a packet from the buffer, you can only retrieve one packet at position 0 at a time (in live mode - in message or stream mode, you can extract more packets, but only a full message in message mode, or only up to the position of m_iEndOff in stream mode). This shifts the initial position by 1, and may potentially reach up to m_iEndOff, but this may only happen if the buffer becomes empty. Otherwise m_iEndOff and m_iDropOff are far in forward, so nothing to change here.
  2. When a new packet comes in from the network, and it was identified to be inserted, possibilities are:
    • position equal to m_iEndOff with m_iDropOff == 0. Only m_iEndOff is increased by 1.
    • position equal to m_iEndOff with existing "first gap". If the next position towards m_iEndOff is busy, ride with m_iEndOff to find a new end of initial contiguous region, and then ride with m_iDropOff, from this position on, to find the after-gap position, both up to m_iMaxPosOff.
    • anywhere after m_iEndOff and after m_iDropOff: nothing to be updated
    • anywhere after m_iEndOff, but before m_iDropOff: find the first valid packet since m_iEndOff and set its position to m_iDropOff. NOTIFY TIME of the delivery for the new drop position.
  3. When a dropping is requested to remove the initial gap, then the buffer gets shifted (its m_iStartPos) to the position of m_iDropOff, and then the procedure of finding the end of the contiguous region and the prospective end of first gap starts anew (updateGapInfo).
  4. The search for a new gap is not done when the insertion is done on m_iEndOff, and m_iMaxPosOff is equal to it. In this case only the m_iEndOff is shifted by 1.
  5. In case when the reading out of order in message mode was done, m_iDropOff is always set to 0. This field is not in use in message mode (it points to a packet, not to a beginning of the message) and the position pointed by it may be taken out the packet, which makes it point to an empty cell.

Note that there are many mutually-exclusive conditions here, for example:

  1. Duplicated packets are discarded in advance, and therefore no flags get modified. This means, for example, that there will never be an event of inserting a packet into the initial contiguous region or at the drop position. The m_iEndOff is the earliest position where insertion may happen.
  2. The m_iDropOff never precedes m_iEndOff, unless it's 0, and if it's not 0, it always points to a valid packet.
  3. Dropping means to discard all cells from the first one up to m_iDropOff, and if this happens, it was pointing to a valid packet. After that m_iStartPos is set to that position and searching for candidates to set to m_iEndOff begins with the next packet.

There are therefore changed the procedures of:

  1. Insertion. This delivers more information so that the conditions for triggering TSBPD can be properly determined.
  2. Dropping. There are many dropping procedures, but only one - this for TSBPD thread - simply shifts to the position of m_iDropOff. Dropping the whole message doesn't rely on it as in the message mode dropping is only for the whole message and results either in an empty buffer or resets the initial contiguous region and hence the other flags.

@codecov-commenter
Copy link

codecov-commenter commented Nov 10, 2022

Codecov Report

Attention: Patch coverage is 0% with 364 lines in your changes missing coverage. Please review.

Project coverage is 0.00%. Comparing base (36260c3) to head (74a27ed).
Report is 33 commits behind head on master.

Files with missing lines Patch % Lines
srtcore/buffer_rcv.cpp 0.00% 274 Missing ⚠️
srtcore/buffer_rcv.h 0.00% 30 Missing ⚠️
srtcore/core.cpp 0.00% 25 Missing ⚠️
test/test_buffer_rcv.cpp 0.00% 25 Missing ⚠️
srtcore/utilities.h 0.00% 8 Missing ⚠️
srtcore/common.h 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master   #2527       +/-   ##
==========================================
- Coverage   65.08%   0.00%   -65.09%     
==========================================
  Files         101     101               
  Lines       17655   19849     +2194     
==========================================
- Hits        11490       0    -11490     
- Misses       6165   19849    +13684     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@ethouris ethouris added Type: Maintenance Work required to maintain or clean up the code [core] Area: Changes in SRT library core labels Nov 14, 2022
srtcore/buffer_rcv.cpp Fixed Show fixed Hide fixed
srtcore/buffer_rcv.cpp Fixed Show resolved Hide resolved
srtcore/core.cpp Fixed Show fixed Hide fixed
srtcore/core.cpp Fixed Show resolved Hide resolved
srtcore/buffer_rcv.cpp Fixed Show fixed Hide fixed
srtcore/buffer_rcv.cpp Fixed Show fixed Hide fixed
srtcore/core.h Outdated
Comment on lines 1140 to 1143
// This function is to return the packet's play time (time when
// it is submitted to the reading application) of the given packet.
// This grp passed here by void* because in the current imp it's
// unused and shall not be used in case when ENABLE_BONDING=0.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This function is to return the packet's play time (time when
// it is submitted to the reading application) of the given packet.
// This grp passed here by void* because in the current imp it's
// unused and shall not be used in case when ENABLE_BONDING=0.
/// This function is to return the packet's play time (time when
/// it is submitted to the reading application) of the given packet.
/// This grp passed here by void* because in the current imp it's
/// unused and shall not be used in case when ENABLE_BONDING=0.

srtcore/core.cpp Outdated
THREAD_PAUSED();
tsbpd_cc.wait_until(tsNextDelivery);
bWakeupOnSignal = tsbpd_cc.wait_until(tsNextDelivery);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bWakeupOnSignal = tsbpd_cc.wait_until(tsNextDelivery);
bWokenupOnSignal = tsbpd_cc.wait_until(tsNextDelivery);

srtcore/core.cpp Outdated
THREAD_PAUSED();
tsbpd_cc.wait();
THREAD_RESUMED();
}

HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!");
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP [" << (bWakeupOnSignal? "signal" : "timeout") << "]!!! - "
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP [" << (bWakeupOnSignal? "signal" : "timeout") << "]!!! - "
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP [" << (bWokenupOnSignal? "signal" : "timeout") << "]!!! - "

@ethouris ethouris marked this pull request as draft April 24, 2024 13:16
Comment on lines 452 to 468
/*
const T& operator[](int index) const
{
if (index < 0 || static_cast<size_t>(index) >= m_size)
throw_invalid_index(index);

return m_entries[index];
}

T& operator[](int index)
{
if (index < 0 || static_cast<size_t>(index) >= m_size)
throw_invalid_index(index);

return m_entries[index];
}
*/

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
srtcore/buffer_rcv.cpp Fixed Show fixed Hide fixed
srtcore/buffer_rcv.cpp Fixed Show fixed Hide fixed
srtcore/buffer_rcv.cpp Fixed Show fixed Hide fixed
srtcore/buffer_rcv.cpp Fixed Show fixed Hide fixed
srtcore/buffer_rcv.cpp Fixed Show fixed Hide fixed
srtcore/buffer_rcv.cpp Fixed Show fixed Hide fixed
srtcore/buffer_rcv.cpp Fixed Show fixed Hide fixed
srtcore/buffer_rcv.cpp Fixed Show fixed Hide fixed
srtcore/buffer_rcv.h Fixed Show fixed Hide fixed
srtcore/buffer_rcv.cpp Fixed Show fixed Hide fixed

const int pos = (m_iStartPos + offset) % m_szSize;
//const CPos newpktpos = m_iStartPos + offset;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//const CPos newpktpos = m_iStartPos + offset;

std::pair<int, int> CRcvBuffer::dropUpTo(int32_t seqno)
{
IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo);

int len = CSeqNo::seqoff(m_iStartSeqNo, seqno);
COff len = COff(CSeqNo(seqno) - m_iStartSeqNo);
//int len = CSeqNo::seqoff(m_iStartSeqNo, seqno);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//int len = CSeqNo::seqoff(m_iStartSeqNo, seqno);

if (m_iEndOff == m_iMaxPosOff)
return SRT_SEQNO_NONE;

//int offset = CSeqNo::seqoff(m_iStartSeqNo, fromseq);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//int offset = CSeqNo::seqoff(m_iStartSeqNo, fromseq);

return true;

const CPos iLastPos = CPos((iStartPos VALUE + iMaxPosOff VALUE) % int(iSize));
//const CPos iLastPos = iStartPos + iMaxPosOff;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//const CPos iLastPos = iStartPos + iMaxPosOff;

CMakeLists.txt Outdated
@@ -1527,7 +1527,7 @@ if (ENABLE_UNITTESTS AND ENABLE_CXX11)
#set_tests_properties(test-srt PROPERTIES RUN_SERIAL TRUE)
else()
set_tests_properties(${tests_srt} PROPERTIES RUN_SERIAL TRUE)
gtest_discover_tests(test-srt)
#gtest_discover_tests(test-srt)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This excludes unit testing from GitHub CI jobs and makes the CodeCov gate fail.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is CodeCov using the only possible default configuration, or it can be done with enabled on demand?

Here I can withdraw this change, but enabling these tests on demand (and not by default) is required.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I fixed this by adding an option to disable test discovery.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
[core] Area: Changes in SRT library core Type: Maintenance Work required to maintain or clean up the code
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants