Skip to content

Commit

Permalink
fix: type ref unexpectedly released (#533)
Browse files Browse the repository at this point in the history
* fix: type ref unexpectedly released

* feat: optimize duplicate padding items

* feat: improve gc update merge performance

* fix: clear flag correctly

* fix: serialize infinite loop after gc

* fix: assert

* chore: reuse exists function

* feat: add test case for pending update repeated applied
  • Loading branch information
darkskygit authored and Brooooooklyn committed Sep 8, 2023
1 parent bcef699 commit 3b37fe7
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 30 deletions.
38 changes: 19 additions & 19 deletions .github/actions/setup-rust/action.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
name: "Rust setup"
description: "Rust setup, including cache configuration"
inputs:
components:
description: "Cargo components"
required: false
targets:
description: "Cargo target"
required: false
toolchain:
description: "Rustup toolchain"
required: false
default: "stable"
components:
description: "Cargo components"
required: false
targets:
description: "Cargo target"
required: false
toolchain:
description: "Rustup toolchain"
required: false
default: "stable"

runs:
using: "composite"
steps:
- name: Setup Rust
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ inputs.toolchain }}
targets: ${{ inputs.targets }}
components: ${{ inputs.components }}
- uses: Swatinem/rust-cache@v2
using: "composite"
steps:
- name: Setup Rust
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ inputs.toolchain }}
targets: ${{ inputs.targets }}
components: ${{ inputs.components }}
- uses: Swatinem/rust-cache@v2
3 changes: 2 additions & 1 deletion y-octo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ nom = "7.1"
ordered-float = "3.8"
rand = "0.8"
rand_chacha = "0.3"
serde = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"

[features]
bench = []
debug = []
large_refs = []

[target.'cfg(fuzzing)'.dependencies]
Expand Down
4 changes: 2 additions & 2 deletions y-octo/src/doc/codec/item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl ItemFlags {

#[inline(always)]
pub fn clear_countable(&self) {
self.clear(item_flags::ITEM_COUNTABLE);
self.clear(!item_flags::ITEM_COUNTABLE);
}

#[inline(always)]
Expand All @@ -108,7 +108,7 @@ impl ItemFlags {

#[inline(always)]
pub fn clear_deleted(&self) {
self.clear(item_flags::ITEM_DELETED);
self.clear(!item_flags::ITEM_DELETED);
}
}

Expand Down
26 changes: 26 additions & 0 deletions y-octo/src/doc/codec/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl Update {
// insert [Node::Skip] if structs[index].id().clock + structs[index].len() <
// structs[index + 1].id().clock
let mut index = 0;
let mut merged_index = vec![];
while index < structs.len() - 1 {
let cur = &structs[index];
let next = &structs[index + 1];
Expand All @@ -157,10 +158,35 @@ impl Update {
Node::new_skip((cur.id().client, clock_end).into(), next_clock - clock_end),
);
index += 1;
} else if cur.id().clock == next_clock {
if cur.deleted() == next.deleted()
&& cur.last_id() == next.last_id()
&& cur.left() == next.left()
&& cur.right() == next.right()
{
// merge two nodes, mark the index
merged_index.push(index + 1);
} else {
debug!("merge failed: {:?} {:?}", cur, next)
}
}

index += 1;
}

{
// prune the merged nodes
let mut new_structs = VecDeque::with_capacity(structs.len() - merged_index.len());
let mut next_remove_idx = 0;
for (idx, val) in structs.drain(..).enumerate() {
if next_remove_idx < merged_index.len() && idx == merged_index[next_remove_idx] {
next_remove_idx += 1;
} else {
new_structs.push_back(val);
}
}
structs.extend(new_structs);
}
}
}

Expand Down
9 changes: 8 additions & 1 deletion y-octo/src/doc/common/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,14 @@ impl OrderRange {
return;
}
*self = match (raw, other) {
(OrderRange::Range(a), OrderRange::Range(b)) => OrderRange::Fragment(vec![a, b]),
(OrderRange::Range(a), OrderRange::Range(b)) => {
if is_continuous_range(&a, &b) {
// merge intersected range
OrderRange::Range(a.start.min(b.start)..a.end.max(b.end))
} else {
OrderRange::Fragment(vec![a, b])
}
}
(OrderRange::Fragment(mut a), OrderRange::Range(b)) => {
a.push(b);
OrderRange::Fragment(a)
Expand Down
93 changes: 92 additions & 1 deletion y-octo/src/doc/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ use std::collections::HashMap;
use super::{publisher::DocPublisher, store::StoreRef, *};
use crate::sync::{Arc, RwLock};

#[cfg(feature = "debug")]
#[derive(Debug, Clone)]
pub struct DocStoreStatus {
pub nodes: usize,
pub delete_sets: usize,
pub types: usize,
pub dangling_types: usize,
pub pending_nodes: usize,
}

/// [DocOptions] used to create a new [Doc]
///
/// ```
Expand Down Expand Up @@ -158,6 +168,19 @@ impl Doc {
self.store.read().unwrap().history(client)
}

#[cfg(feature = "debug")]
pub fn store_status(&self) -> DocStoreStatus {
let store = self.store.read().unwrap();

DocStoreStatus {
nodes: store.total_nodes(),
delete_sets: store.total_delete_sets(),
types: store.total_types(),
dangling_types: store.total_dangling_types(),
pending_nodes: store.total_pending_nodes(),
}
}

pub fn options(&self) -> &DocOptions {
&self.opts
}
Expand Down Expand Up @@ -206,7 +229,7 @@ impl Doc {
if pending_update
.missing_state
.iter()
.any(|(client, clock)| store.get_state(*client) > *clock)
.any(|(client, clock)| *clock < store.get_state(*client))
{
// new update has been applied to the doc, need to re-integrate
retry = true;
Expand Down Expand Up @@ -497,4 +520,72 @@ mod tests {
assert_eq!(count.load(Ordering::SeqCst), 2);
});
}

#[test]
fn test_repeated_applied_pending_update() {
// generate a pending update
// update: [1, 1, 1, 0, 39, 1, 4, 116, 101, 115, 116, 3, 109, 97, 112, 1, 0]
// update: [1, 1, 1, 1, 40, 0, 1, 0, 11, 115, 117, 98, 95, 109, 97, 112, 95,
// 107, 101, 121, 1, 119, 13, 115, 117, 98, 95, 109, 97, 112, 95, 118, 97, 108,
// 117, 101, 0]
// {
// let doc1 = Doc::default();

// doc1.subscribe(|update| {
// println!("update: {:?}", update);
// });

// let mut map = doc1.get_or_create_map("test").unwrap();
// std::thread::sleep(std::time::Duration::from_millis(500));

// let mut sub_map = doc1.create_map().unwrap();
// map.insert("map", sub_map.clone()).unwrap();
// std::thread::sleep(std::time::Duration::from_millis(500));

// sub_map.insert("sub_map_key", "sub_map_value").unwrap();
// std::thread::sleep(std::time::Duration::from_millis(500));
// }

loom_model!({
let mut doc = Doc::default();

doc.apply_update_from_binary(vec![
1, 1, 1, 1, 40, 0, 1, 0, 11, 115, 117, 98, 95, 109, 97, 112, 95, 107, 101, 121, 1, 119, 13, 115, 117,
98, 95, 109, 97, 112, 95, 118, 97, 108, 117, 101, 0,
])
.unwrap();

let pending_size = doc
.store
.read()
.unwrap()
.pending
.as_ref()
.unwrap()
.structs
.iter()
.map(|s| s.1.len())
.sum::<usize>();
doc.apply_update_from_binary(vec![
1, 1, 1, 1, 40, 0, 1, 0, 11, 115, 117, 98, 95, 109, 97, 112, 95, 107, 101, 121, 1, 119, 13, 115, 117,
98, 95, 109, 97, 112, 95, 118, 97, 108, 117, 101, 0,
])
.unwrap();

// pending nodes should not grow up after apply same pending update
assert_eq!(
pending_size,
doc.store
.read()
.unwrap()
.pending
.as_ref()
.unwrap()
.structs
.iter()
.map(|s| s.1.len())
.sum::<usize>()
);
});
}
}
41 changes: 38 additions & 3 deletions y-octo/src/doc/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,37 @@ impl DocStore {
self.items.keys().cloned().collect()
}

#[cfg(feature = "debug")]
pub fn total_nodes(&self) -> usize {
self.items.values().map(|v| v.len()).sum()
}

#[cfg(feature = "debug")]
pub fn total_delete_sets(&self) -> usize {
self.delete_set
.values()
.map(|v| match v {
OrderRange::Range(_) => 1,
OrderRange::Fragment(f) => f.len(),
})
.sum()
}

#[cfg(feature = "debug")]
pub fn total_types(&self) -> usize {
self.types.len()
}

#[cfg(feature = "debug")]
pub fn total_dangling_types(&self) -> usize {
self.dangling_types.len()
}

#[cfg(feature = "debug")]
pub fn total_pending_nodes(&self) -> usize {
self.pending.as_ref().map(|p| p.structs.len()).unwrap_or(0)
}

pub fn get_state(&self, client: Client) -> Clock {
if let Some(structs) = self.items.get(&client) {
if let Some(last_struct) = structs.back() {
Expand Down Expand Up @@ -808,6 +839,8 @@ impl DocStore {
} else {
let mut item = unsafe { item_ref.get_mut_unchecked() };
item.content = Arc::new(Content::Deleted(item.len()));
item.flags.clear_countable();
debug_assert!(!item.flags.countable());
}
}

Expand Down Expand Up @@ -861,15 +894,15 @@ impl DocStore {
let mut idx = nodes.len() - 1;

while idx > 0 && idx >= first_change {
idx = idx.saturating_sub(Self::merge_with_lefts(nodes, idx)? + 1);
idx = idx.saturating_sub(Self::merge_with_lefts(nodes, idx) + 1);
}
}

self.last_optimized_state = state;
Ok(())
}

fn merge_with_lefts(nodes: &mut VecDeque<Node>, idx: usize) -> JwstCodecResult<usize> {
fn merge_with_lefts(nodes: &mut VecDeque<Node>, idx: usize) -> usize {
let mut pos = idx;
loop {
if pos == 0 {
Expand Down Expand Up @@ -951,7 +984,9 @@ impl DocStore {
pos -= 1;
}
nodes.drain(pos + 1..=idx);
Ok(idx - pos)

// return the index of processed items
idx - pos
}
}

Expand Down
4 changes: 3 additions & 1 deletion y-octo/src/doc/types/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub(crate) trait MapType: AsInner<Inner = YTypeRef> {
let map = inner
.map
.iter()
.filter(|(_, v)| v.get().map(|item| !item.deleted()).unwrap_or(false))
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<Vec<(String, Somr<Item>)>>();

Expand All @@ -125,11 +126,12 @@ impl Iterator for MapIterator {
while self.index < len {
let (name, node) = self.nodes[self.index].clone();
if let Some(item) = node.get() {
self.index += 1;
if item.deleted() {
continue;
}

self.index += 1;

return item.content.as_ref().try_into().ok().map(|item| (name, item));
}
}
Expand Down
2 changes: 1 addition & 1 deletion y-octo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use doc::{
CrdtWriter, Doc, DocOptions, Id, Map, RawDecoder, RawEncoder, StateVector, Text, Update, Value,
};
pub(crate) use doc::{Content, Item, HASHMAP_SAFE_CAPACITY};
use log::{debug, warn};
use log::{debug, info, warn};
use nom::IResult;
pub use protocol::{
read_sync_message, write_sync_message, AwarenessState, AwarenessStates, DocMessage, SyncMessage, SyncMessageScanner,
Expand Down
2 changes: 1 addition & 1 deletion y-octo/yrs-is-unsafe/bin/mem_usage.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use yrs::{updates::decoder::Decode, Update};

fn main() {
if let Ok(_) = Update::decode_v1(&[255, 255, 255, 122]) {};
if Update::decode_v1(&[255, 255, 255, 122]).is_ok() {};
}

0 comments on commit 3b37fe7

Please sign in to comment.