Skip to content

Commit

Permalink
perf: Add a cache for first_unmarked_range() (#1582)
Browse files Browse the repository at this point in the history
* Cache results of RangeTracker::first_unmarked_range()
Coalesce additions to the tail of a RangeTracker tree to avoid unnecessary tree growth

* clean up

* more cleanup

* review fixes

* allow overlap

* final_cleanup

* Review responses

---------

Co-authored-by: Lars Eggert <[email protected]>
  • Loading branch information
jesup and larseggert authored Feb 5, 2024
1 parent 0b4b938 commit cb2d623
Showing 1 changed file with 51 additions and 10 deletions.
61 changes: 51 additions & 10 deletions neqo-transport/src/send_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,10 @@ enum RangeState {
/// range implies needing-to-be-sent, either initially or as a retransmission.
#[derive(Debug, Default, PartialEq)]
struct RangeTracker {
// offset, (len, RangeState). Use u64 for len because ranges can exceed 32bits.
/// offset, (len, RangeState). Use u64 for len because ranges can exceed 32bits.
used: BTreeMap<u64, (u64, RangeState)>,
/// this is a cache for first_unmarked_range(), which we check a log
first_unmarked: Option<(u64, Option<u64>)>,
}

impl RangeTracker {
Expand All @@ -166,19 +168,46 @@ impl RangeTracker {

/// Find the first unmarked range. If all are contiguous, this will return
/// (highest_offset(), None).
fn first_unmarked_range(&self) -> (u64, Option<u64>) {
fn first_unmarked_range(&mut self) -> (u64, Option<u64>) {
let mut prev_end = 0;

if let Some(first_unmarked) = self.first_unmarked {
return first_unmarked;
}

for (cur_off, (cur_len, _)) in &self.used {
if prev_end == *cur_off {
prev_end = cur_off + cur_len;
} else {
return (prev_end, Some(cur_off - prev_end));
let res = (prev_end, Some(cur_off - prev_end));
self.first_unmarked = Some(res);
return res;
}
}
self.first_unmarked = Some((prev_end, None));
(prev_end, None)
}

/// Check for the common case of adding to the end. If we can, do it and
/// return true.
fn extend_final_range(&mut self, new_off: u64, new_len: u64, new_state: RangeState) -> bool {
if let Some(mut last) = self.used.last_entry() {
let prev_off = *last.key();
let (prev_len, prev_state) = last.get_mut();
// allow for overlap between new chunk and the last entry
if new_off >= prev_off
&& new_off <= prev_off + *prev_len
&& new_off + new_len > prev_off + *prev_len
&& new_state == *prev_state
{
// simple case, extend the last entry
*prev_len = new_off + new_len - prev_off;
return true;
}
}
false
}

/// Turn one range into a list of subranges that align with existing
/// ranges.
/// Check impermissible overlaps in subregions: Sent cannot overwrite Acked.
Expand Down Expand Up @@ -207,6 +236,8 @@ impl RangeTracker {
let mut tmp_len = new_len;
let mut v = Vec::new();

// we already handled the case of a simple extension of the last item

// cut previous overlapping range if needed
let prev = self.used.range_mut(..tmp_off).next_back();
if let Some((prev_off, (prev_len, prev_state))) = prev {
Expand Down Expand Up @@ -300,6 +331,10 @@ impl RangeTracker {
return;
}

self.first_unmarked = None;
if self.extend_final_range(off, len as u64, state) {
return;
}
let subranges = self.chunk_range_on_edges(off, len as u64, state);

for (sub_off, sub_len, sub_state) in subranges {
Expand All @@ -315,6 +350,7 @@ impl RangeTracker {
return;
}

self.first_unmarked = None;
let len = u64::try_from(len).unwrap();
let end_off = off + len;

Expand Down Expand Up @@ -404,7 +440,7 @@ impl TxBuffer {
can_buffer
}

pub fn next_bytes(&self) -> Option<(u64, &[u8])> {
pub fn next_bytes(&mut self) -> Option<(u64, &[u8])> {
let (start, maybe_len) = self.ranges.first_unmarked_range();

if start == self.retired + u64::try_from(self.buffered()).unwrap() {
Expand Down Expand Up @@ -766,11 +802,13 @@ impl SendStream {
/// offset.
fn next_bytes(&mut self, retransmission_only: bool) -> Option<(u64, &[u8])> {
match self.state {
SendStreamState::Send { ref send_buf, .. } => {
send_buf.next_bytes().and_then(|(offset, slice)| {
SendStreamState::Send {
ref mut send_buf, ..
} => {
let result = send_buf.next_bytes();
if let Some((offset, slice)) = result {
if retransmission_only {
qtrace!(
[self],
"next_bytes apply retransmission limit at {}",
self.retransmission_offset
);
Expand All @@ -786,21 +824,24 @@ impl SendStream {
} else {
Some((offset, slice))
}
})
} else {
None
}
}
SendStreamState::DataSent {
ref send_buf,
ref mut send_buf,
fin_sent,
..
} => {
let used = send_buf.used(); // immutable first
let bytes = send_buf.next_bytes();
if bytes.is_some() {
bytes
} else if fin_sent {
None
} else {
// Send empty stream frame with fin set
Some((send_buf.used(), &[]))
Some((used, &[]))
}
}
SendStreamState::Ready { .. }
Expand Down

0 comments on commit cb2d623

Please sign in to comment.