Skip to content

Commit

Permalink
perf: avoid sort and squash on every push (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo authored Dec 22, 2023
1 parent 58ab09a commit dae318a
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 106 deletions.
8 changes: 4 additions & 4 deletions y-octo/src/doc/codec/delete_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ impl DeleteSet {
}
}

pub fn batch_push(&mut self, client: Client, ranges: Vec<Range<u64>>) {
pub fn batch_add_ranges(&mut self, client: Client, ranges: Vec<Range<u64>>) {
match self.0.entry(client) {
Entry::Occupied(e) => {
e.into_mut().extends(ranges);
e.into_mut().extend(ranges);
}
Entry::Vacant(e) => {
e.insert(ranges.into());
Expand Down Expand Up @@ -202,13 +202,13 @@ mod tests {

{
let mut delete_set = delete_set.clone();
delete_set.batch_push(1, vec![0..5, 10..20]);
delete_set.batch_add_ranges(1, vec![0..5, 10..20]);
assert_eq!(delete_set.get(&1), Some(&OrderRange::Range(0..30)));
}

{
let mut delete_set = delete_set;
delete_set.batch_push(1, vec![40..50, 10..20]);
delete_set.batch_add_ranges(1, vec![40..50, 10..20]);
assert_eq!(delete_set.get(&1), Some(&OrderRange::from(vec![0..30, 40..50])));
}
}
Expand Down
199 changes: 100 additions & 99 deletions y-octo/src/doc/common/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ fn is_continuous_range(lhs: &Range<u64>, rhs: &Range<u64>) -> bool {
}

impl OrderRange {
pub fn ranges_len(&self) -> usize {
match self {
OrderRange::Range(_) => 1,
OrderRange::Fragment(ranges) => ranges.len(),
}
}

pub fn is_empty(&self) -> bool {
match self {
OrderRange::Range(range) => range.is_empty(),
Expand Down Expand Up @@ -121,35 +128,6 @@ impl OrderRange {
diffs
}

pub fn extends<T>(&mut self, list: T)
where
T: Into<VecDeque<Range<u64>>>,
{
let list: VecDeque<_> = list.into();
if list.is_empty() {
return;
}

if self.is_empty() {
*self = OrderRange::Fragment(list);
} else {
match self {
OrderRange::Range(old) => {
let old = old.clone();
// swap and push is faster then push one by one
*self = OrderRange::Fragment(list);
self.push(old);
}
OrderRange::Fragment(ranges) => {
ranges.extend(list);
}
}
}

self.sort();
self.squash();
}

/// Push new range to current one.
/// Range will be merged if overlap exists or turned into fragment if it's
/// not continuous.
Expand All @@ -173,65 +151,48 @@ impl OrderRange {
if ranges.is_empty() {
*self = OrderRange::Range(range);
} else {
let search_result = ranges.binary_search_by(|r| {
if is_continuous_range(r, &range) {
std::cmp::Ordering::Equal
} else if r.end < range.start {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Greater
}
});

match search_result {
Ok(idx) => {
let old = &mut ranges[idx];
ranges[idx] = old.start.min(range.start)..old.end.max(range.end);
self.squash();
}
Err(idx) => ranges.insert(idx, range),
}
OrderRange::push_inner(ranges, range);
self.make_single();
}
}
}
}

pub fn pop(&mut self) -> Option<Range<u64>> {
if self.is_empty() {
None
} else {
match self {
OrderRange::Range(range) => Some(mem::replace(range, 0..0)),
OrderRange::Fragment(list) => list.pop_front(),
}
}
}

pub fn merge(&mut self, other: Self) {
let raw = std::mem::take(self);
if raw.is_empty() {
*self = other;
return;
self.extend(&other);
}

fn make_fragment(&mut self) {
if let OrderRange::Range(range) = self {
*self = OrderRange::Fragment(if range.is_empty() {
VecDeque::new()
} else {
VecDeque::from([range.clone()])
});
}
*self = match (raw, other) {
(OrderRange::Range(a), OrderRange::Range(b)) => {
if is_continuous_range(&a, &b) {
// merge intersected range
OrderRange::Range(a.start.min(b.start)..a.end.max(b.end))
} else {
OrderRange::Fragment(VecDeque::from([a, b]))
}
}
(OrderRange::Fragment(mut a), OrderRange::Range(b)) => {
a.push_back(b);
OrderRange::Fragment(a)
}
(OrderRange::Range(a), OrderRange::Fragment(b)) => {
let mut v = b;
v.push_back(a);
OrderRange::Fragment(v)
}
(OrderRange::Fragment(mut a), OrderRange::Fragment(mut b)) => {
a.append(&mut b);
OrderRange::Fragment(a)
}
};
}

self.sort();
self.squash()
fn make_single(&mut self) {
if let OrderRange::Fragment(ranges) = self {
if ranges.len() == 1 {
*self = OrderRange::Range(ranges[0].clone());
}
}
}

/// Merge all available ranges list into one.
fn squash(&mut self) {
pub fn squash(&mut self) {
// merge all available ranges
if let OrderRange::Fragment(ranges) = self {
if ranges.is_empty() {
Expand All @@ -240,7 +201,7 @@ impl OrderRange {
}

let mut changed = false;
let mut merged = Vec::with_capacity(ranges.len());
let mut merged = VecDeque::with_capacity(ranges.len());
let mut cur = ranges[0].clone();

for next in ranges.iter().skip(1) {
Expand All @@ -249,33 +210,64 @@ impl OrderRange {
cur.end = cur.end.max(next.end);
changed = true;
} else {
merged.push(cur);
merged.push_back(cur);
cur = next.clone();
}
}
merged.push(cur);
merged.push_back(cur);

if merged.len() == 1 {
*self = OrderRange::Range(merged[0].clone());
} else if changed {
mem::swap(ranges, &mut merged.into_iter().collect());
mem::swap(ranges, &mut merged);
}
}
}

fn sort(&mut self) {
if let OrderRange::Fragment(ranges) = self {
ranges.make_contiguous().sort_by(|lhs, rhs| lhs.start.cmp(&rhs.start));
fn push_inner(list: &mut VecDeque<Range<u64>>, range: Range<u64>) {
if list.is_empty() {
list.push_back(range);
} else {
let search_result = list.binary_search_by(|r| {
if is_continuous_range(r, &range) {
std::cmp::Ordering::Equal
} else if r.end < range.start {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Greater
}
});

match search_result {
Ok(idx) => {
let old = &mut list[idx];
list[idx] = old.start.min(range.start)..old.end.max(range.end);
Self::squash_around(list, idx);
}
Err(idx) => {
list.insert(idx, range);
Self::squash_around(list, idx);
}
}
}
}

pub fn pop(&mut self) -> Option<Range<u64>> {
if self.is_empty() {
None
} else {
match self {
OrderRange::Range(range) => Some(mem::replace(range, 0..0)),
OrderRange::Fragment(list) => list.pop_front(),
fn squash_around(list: &mut VecDeque<Range<u64>>, idx: usize) {
if idx > 0 {
let prev = &list[idx - 1];
let cur = &list[idx];
if is_continuous_range(prev, cur) {
list[idx - 1] = prev.start.min(cur.start)..prev.end.max(cur.end);
list.remove(idx);
}
}

if idx < list.len() - 1 {
let next = &list[idx + 1];
let cur = &list[idx];
if is_continuous_range(cur, next) {
list[idx] = cur.start.min(next.start)..cur.end.max(next.end);
list.remove(idx + 1);
}
}
}
Expand All @@ -290,6 +282,22 @@ impl<'a> IntoIterator for &'a OrderRange {
}
}

impl Extend<Range<u64>> for OrderRange {
fn extend<T: IntoIterator<Item = Range<u64>>>(&mut self, other: T) {
self.make_fragment();
match self {
OrderRange::Fragment(ranges) => {
for range in other {
OrderRange::push_inner(ranges, range);
}

self.make_single();
}
_ => unreachable!(),
}
}
}

pub struct OrderRangeIter<'a> {
range: &'a OrderRange,
idx: usize,
Expand Down Expand Up @@ -374,13 +382,6 @@ mod tests {
assert_eq!(range, OrderRange::Range(0..30));
}

#[test]
fn test_range_sort() {
let mut range: OrderRange = vec![(20..30), (0..10), (10..50)].into();
range.sort();
assert_eq!(range, OrderRange::from(vec![(0..10), (10..50), (20..30)]));
}

#[test]
fn test_range_covered() {
assert!(!OrderRange::check_range_covered(&[0..1], &[2..3]));
Expand Down Expand Up @@ -433,7 +434,7 @@ mod tests {
}

#[test]
fn test_range_merge() {
fn test_range_extend() {
let mut range: OrderRange = (0..10).into();
range.merge((20..30).into());
assert_eq!(range, OrderRange::from(vec![(0..10), (20..30)]));
Expand Down
6 changes: 3 additions & 3 deletions y-octo/src/doc/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ impl DocStore {
let mut pending_delete_sets = HashMap::new();
Self::delete_item_inner(&mut pending_delete_sets, item, parent);
for (client, ranges) in pending_delete_sets {
self.delete_set.batch_push(client, ranges);
self.delete_set.batch_add_ranges(client, ranges);
}
}

Expand Down Expand Up @@ -713,7 +713,7 @@ impl DocStore {
idx += 1;
}
for (client, ranges) in pending_delete_sets {
self.delete_set.batch_push(client, ranges);
self.delete_set.batch_add_ranges(client, ranges);
}
}
}
Expand Down Expand Up @@ -809,7 +809,7 @@ impl DocStore {
})
.collect::<Vec<_>>();
if !ranges.is_empty() {
delete_set.batch_push(*client, ranges);
delete_set.batch_add_ranges(*client, ranges);
}
}

Expand Down

0 comments on commit dae318a

Please sign in to comment.