From bcef699f6abe1265a54863344f78a29d3f1058d7 Mon Sep 17 00:00:00 2001 From: DarkSky <25152247+darkskygit@users.noreply.github.com> Date: Mon, 4 Sep 2023 15:03:20 +0800 Subject: [PATCH] feat: add history parsing & update pub/sub --- .github/actions/setup-rust/action.yml | 40 ++-- y-octo/fuzz/fuzz_targets/apply_update.rs | 23 +- .../fuzz/fuzz_targets/codec_doc_any_struct.rs | 6 +- y-octo/src/doc/codec/id.rs | 1 + y-octo/src/doc/codec/item.rs | 15 ++ y-octo/src/doc/codec/update.rs | 4 + y-octo/src/doc/document.rs | 30 ++- y-octo/src/doc/history.rs | 204 ++++++++++++++++++ y-octo/src/doc/mod.rs | 2 + y-octo/src/doc/store.rs | 10 +- 10 files changed, 286 insertions(+), 49 deletions(-) create mode 100644 y-octo/src/doc/history.rs diff --git a/.github/actions/setup-rust/action.yml b/.github/actions/setup-rust/action.yml index 4e465e5..a1742c8 100644 --- a/.github/actions/setup-rust/action.yml +++ b/.github/actions/setup-rust/action.yml @@ -1,24 +1,24 @@ -name: "Y-Octo Rust setup" +name: "Rust setup" description: "Rust setup, including cache configuration" inputs: - components: - description: "Cargo components" - required: false - targets: - description: "Cargo target" - required: false - toolchain: - description: "Rustup toolchain" - required: false - default: "stable" + components: + description: "Cargo components" + required: false + targets: + description: "Cargo target" + required: false + toolchain: + description: "Rustup toolchain" + required: false + default: "stable" runs: - using: "composite" - steps: - - name: Setup Rust - uses: dtolnay/rust-toolchain@stable - with: - toolchain: ${{ inputs.toolchain }} - targets: ${{ inputs.targets }} - components: ${{ inputs.components }} - - uses: Swatinem/rust-cache@v2 + using: "composite" + steps: + - name: Setup Rust + uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{ inputs.toolchain }} + targets: ${{ inputs.targets }} + components: ${{ inputs.components }} + - uses: Swatinem/rust-cache@v2 diff --git a/y-octo/fuzz/fuzz_targets/apply_update.rs b/y-octo/fuzz/fuzz_targets/apply_update.rs index d052964..7c66837 100644 --- a/y-octo/fuzz/fuzz_targets/apply_update.rs +++ b/y-octo/fuzz/fuzz_targets/apply_update.rs @@ -1,10 +1,11 @@ #![no_main] -use libfuzzer_sys::fuzz_target; use std::collections::HashSet; + +use libfuzzer_sys::fuzz_target; use y_octo_utils::{ - gen_nest_type_from_nest_type, gen_nest_type_from_root, CRDTParam, ManipulateSource, OpType, - OpsRegistry, YrsNestType, + gen_nest_type_from_nest_type, gen_nest_type_from_root, CRDTParam, ManipulateSource, OpType, OpsRegistry, + YrsNestType, }; use yrs::Transact; @@ -22,11 +23,7 @@ fuzz_target!(|crdt_params: Vec| { match crdt_param.op_type { OpType::HandleCurrent => { if cur_crdt_nest_type.is_some() { - ops_registry.operate_yrs_nest_type( - &doc, - cur_crdt_nest_type.clone().unwrap(), - crdt_param, - ); + ops_registry.operate_yrs_nest_type(&doc, cur_crdt_nest_type.clone().unwrap(), crdt_param); } } OpType::CreateCRDTNestType => { @@ -34,14 +31,10 @@ fuzz_target!(|crdt_params: Vec| { None => gen_nest_type_from_root(&mut doc, &crdt_param), Some(mut nest_type) => match crdt_param.manipulate_source { ManipulateSource::CurrentNestType => Some(nest_type), - ManipulateSource::NewNestTypeFromYDocRoot => { - gen_nest_type_from_root(&mut doc, &crdt_param) + ManipulateSource::NewNestTypeFromYDocRoot => gen_nest_type_from_root(&mut doc, &crdt_param), + ManipulateSource::NewNestTypeFromCurrent => { + gen_nest_type_from_nest_type(&mut doc, crdt_param.clone(), &mut nest_type) } - ManipulateSource::NewNestTypeFromCurrent => gen_nest_type_from_nest_type( - &mut doc, - crdt_param.clone(), - &mut nest_type, - ), }, }; } diff --git a/y-octo/fuzz/fuzz_targets/codec_doc_any_struct.rs b/y-octo/fuzz/fuzz_targets/codec_doc_any_struct.rs index fd0ee70..60c6eb1 100644 --- a/y-octo/fuzz/fuzz_targets/codec_doc_any_struct.rs +++ b/y-octo/fuzz/fuzz_targets/codec_doc_any_struct.rs @@ -14,11 +14,7 @@ fn get_random_string() -> String { fuzz_target!(|data: Vec| { { - let any = Any::Object( - data.iter() - .map(|a| (get_random_string(), a.clone())) - .collect(), - ); + let any = Any::Object(data.iter().map(|a| (get_random_string(), a.clone())).collect()); let mut buffer = RawEncoder::default(); if let Err(e) = any.write(&mut buffer) { diff --git a/y-octo/src/doc/codec/id.rs b/y-octo/src/doc/codec/id.rs index e633645..0c0d41e 100644 --- a/y-octo/src/doc/codec/id.rs +++ b/y-octo/src/doc/codec/id.rs @@ -44,6 +44,7 @@ impl Add for Id { } } +#[allow(clippy::incorrect_partial_ord_impl_on_ord_type)] impl PartialOrd for Id { fn partial_cmp(&self, other: &Self) -> Option { match self.client.cmp(&other.client) { diff --git a/y-octo/src/doc/codec/item.rs b/y-octo/src/doc/codec/item.rs index ce089aa..50d77ec 100644 --- a/y-octo/src/doc/codec/item.rs +++ b/y-octo/src/doc/codec/item.rs @@ -214,6 +214,21 @@ impl Item { } } + pub fn resolve_parent(&self) -> Option<(Option, Option)> { + if let Some(item) = self.left.as_ref().map(|n| n.as_item()).and_then(|i| i.get().cloned()) { + if item.parent.is_none() { + if let Some(item) = item.right.map(|n| n.as_item()).and_then(|i| i.get().cloned()) { + return Some((item.parent.clone(), item.parent_sub.clone())); + } + } else { + return Some((item.parent.clone(), item.parent_sub.clone())); + } + } else if let Some(item) = self.right.as_ref().map(|n| n.as_item()).and_then(|i| i.get().cloned()) { + return Some((item.parent.clone(), item.parent_sub.clone())); + } + None + } + pub fn len(&self) -> u64 { self.content.clock_len() } diff --git a/y-octo/src/doc/codec/update.rs b/y-octo/src/doc/codec/update.rs index 4c68445..09d3fba 100644 --- a/y-octo/src/doc/codec/update.rs +++ b/y-octo/src/doc/codec/update.rs @@ -164,6 +164,10 @@ impl Update { } } + pub fn is_content_empty(&self) -> bool { + self.structs.is_empty() + } + pub fn is_empty(&self) -> bool { self.structs.is_empty() && self.delete_set.is_empty() } diff --git a/y-octo/src/doc/document.rs b/y-octo/src/doc/document.rs index b7b9025..ad7c38e 100644 --- a/y-octo/src/doc/document.rs +++ b/y-octo/src/doc/document.rs @@ -107,7 +107,7 @@ pub struct Doc { opts: DocOptions, pub(crate) store: StoreRef, - pub(crate) publisher: Arc, + pub publisher: Arc, } unsafe impl Send for Doc {} @@ -150,6 +150,14 @@ impl Doc { self.client_id } + pub fn clients(&self) -> Vec { + self.store.read().unwrap().clients() + } + + pub fn history(&self, client: u64) -> Option> { + self.store.read().unwrap().history(client) + } + pub fn options(&self) -> &DocOptions { &self.opts } @@ -170,16 +178,16 @@ impl Doc { Ok(doc) } - pub fn apply_update_from_binary(&mut self, update: Vec) -> JwstCodecResult { + pub fn apply_update_from_binary(&mut self, update: Vec) -> JwstCodecResult { let mut decoder = RawDecoder::new(update); let update = Update::read(&mut decoder)?; - self.apply_update(update)?; - Ok(()) + self.apply_update(update) } - pub fn apply_update(&mut self, mut update: Update) -> JwstCodecResult { + pub fn apply_update(&mut self, mut update: Update) -> JwstCodecResult { let mut store = self.store.write().unwrap(); let mut retry = false; + let before_state = store.get_state_vector(); loop { for (mut s, offset) in update.iter(store.get_state_vector()) { if let Node::Item(item) = &mut s { @@ -238,7 +246,7 @@ impl Doc { } } - Ok(()) + store.diff_state_vector(&before_state) } pub fn keys(&self) -> Vec { @@ -293,13 +301,21 @@ impl Doc { } pub fn encode_state_as_update_v1(&self, sv: &StateVector) -> JwstCodecResult> { - let update = self.store.read().unwrap().diff_state_vector(sv)?; + let update = self.encode_state_as_update(sv)?; let mut encoder = RawEncoder::default(); update.write(&mut encoder)?; Ok(encoder.into_inner()) } + pub fn encode_update(&self) -> JwstCodecResult { + self.encode_state_as_update(&StateVector::default()) + } + + pub fn encode_state_as_update(&self, sv: &StateVector) -> JwstCodecResult { + self.store.read().unwrap().diff_state_vector(sv) + } + pub fn get_state_vector(&self) -> StateVector { self.store.read().unwrap().get_state_vector() } diff --git a/y-octo/src/doc/history.rs b/y-octo/src/doc/history.rs new file mode 100644 index 0000000..2fc4532 --- /dev/null +++ b/y-octo/src/doc/history.rs @@ -0,0 +1,204 @@ +use std::collections::{HashMap, VecDeque}; + +use serde::Serialize; + +use super::*; + +/// The ancestor table is a table that records the names of all the ancestors of +/// a node. It is generated every time the history is rebuilt and is used to +/// quickly look up the parent path of a CRDT item. The process of generating +/// this table involves traversing the item nodes and recording their ID as well +/// as their complete name as a parent. +/// TODO: The current implementation is a simple implementation with a lot of +/// room for optimization and should be optimized thereafter +#[derive(Debug)] +struct AncestorTable(HashMap); + +impl AncestorTable { + fn new(items: &[&Item]) -> Self { + let mut name_map: HashMap = HashMap::new(); + let mut padding_ptr: VecDeque<(&Item, usize)> = + VecDeque::from(items.iter().map(|i| (<&Item>::clone(i), 0)).collect::>()); + + while let Some((item, retry)) = padding_ptr.pop_back() { + if retry > 5 { + debug!("retry failed: {:?}, {:?}, {:?}", item, retry, padding_ptr); + break; + } + let (parent, parent_sub) = { + let parent = if item.parent.is_none() { + if let Some((parent, parent_sub)) = item.resolve_parent() { + Self::parse_parent(&name_map, parent).map(|parent| (parent, parent_sub)) + } else { + Some(("unknown".to_owned(), None)) + } + } else { + Self::parse_parent(&name_map, item.parent.clone()).map(|parent| (parent, item.parent_sub.clone())) + }; + + if let Some(parent) = parent { + parent + } else { + padding_ptr.push_front((item, retry + 1)); + continue; + } + }; + + let parent = if let Some(parent_sub) = parent_sub { + format!("{parent}.{parent_sub}") + } else { + parent + }; + + name_map.insert(item.id, parent.clone()); + } + + Self(name_map) + } + + fn parse_parent(name_map: &HashMap, parent: Option) -> Option { + match parent { + None => Some("unknown".to_owned()), + Some(Parent::Type(ptr)) => ptr.ty().and_then(|ty| { + ty.item + .get() + .and_then(|i| name_map.get(&i.id)) + .cloned() + .or(ty.root_name.clone()) + }), + Some(Parent::String(name)) => Some(name.to_string()), + Some(Parent::Id(id)) => name_map.get(&id).cloned(), + } + } + + fn get(&self, id: &Id) -> Option { + self.0.get(id).cloned() + } +} + +#[derive(Debug, Serialize, PartialEq)] +pub struct RawHistory { + id: String, + parent: String, + content: String, +} + +struct SortedNodes<'a> { + nodes: Vec<(&'a Client, &'a VecDeque)>, + current: Option>, +} + +impl<'a> SortedNodes<'a> { + pub fn new(mut nodes: Vec<(&'a Client, &'a VecDeque)>) -> Self { + nodes.sort_by(|a, b| b.0.cmp(a.0)); + let current = nodes.pop().map(|(_, v)| v.clone()); + Self { nodes, current } + } +} + +impl Iterator for SortedNodes<'_> { + type Item = Node; + + fn next(&mut self) -> Option { + if let Some(current) = self.current.as_mut() { + if let Some(node) = current.pop_back() { + return Some(node); + } + } + + if let Some((_, nodes)) = self.nodes.pop() { + self.current = Some(nodes.clone()); + self.next() + } else { + None + } + } +} + +impl DocStore { + pub fn history(&self, client: u64) -> Option> { + let items = SortedNodes::new(self.items.iter().collect::>()) + .filter_map(|n| n.as_item().get().cloned()) + .collect::>(); + let mut items = items.iter().collect::>(); + items.sort_by(|a, b| a.id.cmp(&b.id)); + + let mut histories = vec![]; + let parent_map = AncestorTable::new(&items); + + for item in items { + if item.deleted() { + continue; + } + if let Some(parent) = parent_map.get(&item.id) { + if item.id.client == client || client == 0 { + histories.push(RawHistory { + id: item.id.to_string(), + parent, + content: Value::try_from(item.content.as_ref()) + .map(|v| v.to_string()) + .unwrap_or("unknown".to_owned()), + }) + } + } else { + info!("headless id: {:?}", item.id); + } + } + + Some(histories) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn parse_history_client_test() { + loom_model!({ + let doc = Doc::default(); + let mut map = doc.get_or_create_map("map").unwrap(); + let mut sub_map = doc.create_map().unwrap(); + map.insert("sub_map", sub_map.clone()).unwrap(); + sub_map.insert("key", "value").unwrap(); + + assert_eq!(doc.clients()[0], doc.client()); + }); + } + + #[test] + fn parse_history_test() { + loom_model!({ + let doc = Doc::default(); + let mut map = doc.get_or_create_map("map").unwrap(); + let mut sub_map = doc.create_map().unwrap(); + map.insert("sub_map", sub_map.clone()).unwrap(); + sub_map.insert("key", "value").unwrap(); + + let history = doc.store.read().unwrap().history(0).unwrap(); + + let mut update = doc.encode_update().unwrap(); + let items = update + .iter(StateVector::default()) + .filter_map(|n| n.0.as_item().get().cloned()) + .collect::>(); + let items = items.iter().collect::>(); + + let mut mock_histories: Vec = vec![]; + let parent_map = AncestorTable::new(&items); + for item in items { + if let Some(parent) = parent_map.get(&item.id) { + mock_histories.push(RawHistory { + id: item.id.to_string(), + parent, + content: Value::try_from(item.content.as_ref()) + .map(|v| v.to_string()) + .unwrap_or("unknown".to_owned()), + }) + } + } + + assert_eq!(history, mock_histories); + }); + } +} diff --git a/y-octo/src/doc/mod.rs b/y-octo/src/doc/mod.rs index fde2c27..6308caf 100644 --- a/y-octo/src/doc/mod.rs +++ b/y-octo/src/doc/mod.rs @@ -2,6 +2,7 @@ mod awareness; mod codec; mod common; mod document; +mod history; mod publisher; mod store; mod types; @@ -11,6 +12,7 @@ pub use awareness::{Awareness, AwarenessEvent}; pub use codec::*; pub use common::*; pub use document::{Doc, DocOptions}; +pub use history::RawHistory; pub(crate) use store::DocStore; pub use types::*; pub use utils::*; diff --git a/y-octo/src/doc/store.rs b/y-octo/src/doc/store.rs index c9ebf2b..89a1877 100644 --- a/y-octo/src/doc/store.rs +++ b/y-octo/src/doc/store.rs @@ -49,6 +49,10 @@ impl DocStore { self.client } + pub fn clients(&self) -> Vec { + self.items.keys().cloned().collect() + } + pub fn get_state(&self, client: Client) -> Clock { if let Some(structs) = self.items.get(&client) { if let Some(last_struct) = structs.back() { @@ -558,7 +562,9 @@ impl DocStore { if item.parent_sub.is_none() && item.countable() { if let Some(parent) = parent { - parent.len -= item.len(); + if parent.len != 0 { + parent.len -= item.len(); + } } else if let Some(Parent::Type(ty)) = &item.parent { ty.ty_mut().unwrap().len -= item.len(); } @@ -828,7 +834,7 @@ impl DocStore { } ty.start = Somr::none(); - for (_, item) in &ty.map { + for item in ty.map.values() { if let Some(item) = item.get() { Self::gc_item_by_id(items, item.id, true)?; }