Skip to content

Commit

Permalink
feat: exclude pending state in broadcast update (#546)
Browse files Browse the repository at this point in the history
* feat: don't include pending state in broadcast update

* feat: enable large refs for keck server

* feat: put apply update into a separate thread
  • Loading branch information
darkskygit authored Oct 10, 2023
1 parent 91c497f commit a342180
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 16 deletions.
2 changes: 1 addition & 1 deletion apps/keck/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }

jwst-core = { workspace = true }
jwst-core = { workspace = true, features = ["large_refs"] }
jwst-logger = { workspace = true }
jwst-rpc = { workspace = true }
jwst-storage = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions libs/jwst-codec/src/doc/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl Doc {
}
}

store.diff_state_vector(&before_state)
store.diff_state_vector(&before_state, false)
}

pub fn keys(&self) -> Vec<String> {
Expand Down Expand Up @@ -356,7 +356,7 @@ impl Doc {
}

pub fn encode_state_as_update(&self, sv: &StateVector) -> JwstCodecResult<Update> {
self.store.read().unwrap().diff_state_vector(sv)
self.store.read().unwrap().diff_state_vector(sv, true)
}

pub fn get_state_vector(&self) -> StateVector {
Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-codec/src/doc/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl DocPublisher {
);

history.resolve_with_store(&store);
let (binary, history) = match store.diff_state_vector(&last_update) {
let (binary, history) = match store.diff_state_vector(&last_update, false) {
Ok(update) => {
drop(store);

Expand Down
8 changes: 5 additions & 3 deletions libs/jwst-codec/src/doc/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ impl DocStore {
diff
}

pub fn diff_state_vector(&self, sv: &StateVector) -> JwstCodecResult<Update> {
pub fn diff_state_vector(&self, sv: &StateVector, with_pending: bool) -> JwstCodecResult<Update> {
let update_structs = Self::diff_structs(&self.items, sv)?;

let mut update = Update {
Expand All @@ -737,8 +737,10 @@ impl DocStore {
..Update::default()
};

if let Some(pending) = &self.pending {
Update::merge_into(&mut update, [pending.clone()])
if with_pending {
if let Some(pending) = &self.pending {
Update::merge_into(&mut update, [pending.clone()])
}
}

Ok(update)
Expand Down
6 changes: 5 additions & 1 deletion libs/jwst-core/src/workspaces/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ impl Workspace {
.ok()
}),
DocMessage::Step2(update) => {
let len = update.len();
if let Ok(update) = Update::read(&mut RawDecoder::new(update)) {
debug!("step2 apply update: {len}");
if let Err(e) = doc.apply_update(update) {
warn!("failed to apply update: {:?}", e);
}
Expand All @@ -118,7 +120,9 @@ impl Workspace {

let mut encoder = RawEncoder::default();
update.write(&mut encoder)?;
Ok(Some(encoder.into_inner()))
let update = encoder.into_inner();
debug!("step3 return changed update: {}", update.len());
Ok(Some(update))
})
.map_err(|e| warn!("failed to apply update: {:?}", e))
.ok()
Expand Down
9 changes: 7 additions & 2 deletions libs/jwst-rpc/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ pub async fn subscribe(workspace: &Workspace, identifier: String, sender: Broadc
{
let sender = sender.clone();
let workspace_id = workspace.id();
workspace.subscribe_doc(move |update, _history| {
debug!("workspace {} changed: {}bytes", workspace_id, update.len());
workspace.subscribe_doc(move |update, history| {
debug!(
"workspace {} changed: {}bytes, {} histories",
workspace_id,
update.len(),
history.len()
);

match encode_update_with_guid(update.to_vec(), workspace_id.clone())
.and_then(|update_with_guid| encode_update_as_message(update.to_vec()).map(|u| (update_with_guid, u)))
Expand Down
20 changes: 14 additions & 6 deletions libs/jwst-rpc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ use chrono::Utc;
use jwst_codec::{CrdtReader, RawDecoder};
use jwst_core::{DocStorage, Workspace};
use jwst_storage::{JwstStorage, JwstStorageResult};
use tokio::sync::{
broadcast::{channel as broadcast, error::RecvError, Receiver as BroadcastReceiver, Sender as BroadcastSender},
mpsc::{Receiver as MpscReceiver, Sender as MpscSender},
Mutex,
use tokio::{
sync::{
broadcast::{channel as broadcast, error::RecvError, Receiver as BroadcastReceiver, Sender as BroadcastSender},
mpsc::{Receiver as MpscReceiver, Sender as MpscSender},
Mutex,
},
task::spawn_blocking,
};

use super::{
Expand Down Expand Up @@ -155,7 +158,7 @@ pub trait RpcContextImpl<'a> {
// collect messages from remote
let identifier = identifier.to_owned();
let id = id.to_string();
let mut workspace = self
let workspace = self
.get_storage()
.get_workspace(&id)
.await
Expand Down Expand Up @@ -185,7 +188,12 @@ pub trait RpcContextImpl<'a> {
let updates = std::mem::take(&mut updates);
let updates_len = updates.len();
let ts = Instant::now();
let message = workspace.sync_messages(updates);
let message = {
let mut workspace = workspace.clone();
spawn_blocking(move || workspace.sync_messages(updates))
.await
.unwrap()
};
if ts.elapsed().as_micros() > 50 {
debug!(
"apply {updates_len} remote update cost: {}ms",
Expand Down

1 comment on commit a342180

@vercel
Copy link

@vercel vercel bot commented on a342180 Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.