Skip to content

Commit

Permalink
chore: upgrade apis for jwst-codec
Browse files Browse the repository at this point in the history
  • Loading branch information
darkskygit committed Dec 28, 2023
1 parent f0cc5ac commit 4737ffa
Show file tree
Hide file tree
Showing 16 changed files with 81 additions and 55 deletions.
4 changes: 2 additions & 2 deletions apps/doc_merger/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ fn jwst_merge(path: &str, output: &str) {
let mut doc = Doc::default();
for (i, update) in updates.iter().enumerate() {
println!("apply update{i} {} bytes", update.len());
doc.apply_update_from_binary(update.clone()).unwrap();
doc.apply_update_from_binary_v1(update.clone()).unwrap();
println!("status: {:?}", doc.store_status());
}
let ts = Instant::now();
Expand All @@ -93,7 +93,7 @@ fn jwst_merge(path: &str, output: &str) {

{
let mut doc = Doc::default();
doc.apply_update_from_binary(binary.clone()).unwrap();
doc.apply_update_from_binary_v1(binary.clone()).unwrap();
let new_binary = doc.encode_update_v1().unwrap();
let new_json = serde_json::to_string_pretty(&doc.get_map("space:blocks").unwrap()).unwrap();
assert_json_diff::assert_json_eq!(doc.get_map("space:blocks").unwrap(), json);
Expand Down
2 changes: 1 addition & 1 deletion apps/keck/src/server/api/blocks/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl From<(&str, &History)> for BlockHistory {
fn from((workspace_id, history): (&str, &History)) -> Self {
Self {
workspace_id: workspace_id.into(),
field_name: history.field_name.clone(),
field_name: history.field_name.as_ref().map(|s| s.to_string()),
parent: history.parent.iter().map(|id| id.to_string()).collect::<Vec<_>>(),
content: history.content.clone(),
action: history.action.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-codec/src/doc/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl ToString for HistoryAction {
#[derive(Debug, PartialEq)]
pub struct History {
pub id: String,
pub field_name: Option<String>,
pub field_name: Option<SmolStr>,
pub parent: Vec<String>,
pub content: String,
pub action: HistoryAction,
Expand Down
8 changes: 3 additions & 5 deletions libs/jwst-core/src/block/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,8 @@ mod tests {
#[test]
fn test_multiple_layer_space_clone() {
let mut doc1 = Doc::default();
doc1.apply_update(
Update::from_ybinary1(include_bytes!("../../fixtures/test_multi_layer.bin").to_vec()).unwrap(),
)
.unwrap();
doc1.apply_update(Update::decode_v1(include_bytes!("../../fixtures/test_multi_layer.bin").to_vec()).unwrap())
.unwrap();

let mut ws1 = Workspace::from_doc(doc1, "test").unwrap();

Expand All @@ -248,7 +246,7 @@ mod tests {
};

let mut doc2 = Doc::default();
doc2.apply_update(Update::from_ybinary1(new_update).unwrap()).unwrap();
doc2.apply_update(Update::decode_v1(new_update).unwrap()).unwrap();

let doc1 = ws1.doc;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-core/src/space/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ mod test {
let new_doc = {
let update = doc
.encode_state_as_update_v1(&StateVector::default())
.and_then(|update| Update::from_ybinary1(update))
.and_then(|update| Update::decode_v1(update))
.unwrap();

let mut doc = Doc::default();
Expand Down
42 changes: 23 additions & 19 deletions libs/jwst-core/src/workspaces/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl Workspace {
trace!("processing message: {:?}", msg);
match msg {
SyncMessage::Doc(msg) => match msg {
DocMessage::Step1(sv) => StateVector::read(&mut RawDecoder::new(sv)).ok().and_then(|sv| {
DocMessage::Step1(sv) => StateVector::read(&mut RawDecoder::new(&sv)).ok().and_then(|sv| {
debug!("step1 get sv: {sv:?}");
doc.encode_state_as_update_v1(&sv)
.map(|update| {
Expand All @@ -103,31 +103,35 @@ impl Workspace {
}),
DocMessage::Step2(update) => {
let len = update.len();
if let Ok(update) = Update::read(&mut RawDecoder::new(update)) {
if let Ok(update) = Update::read(&mut RawDecoder::new(&update)) {
debug!("step2 apply update: {len}");
if let Err(e) = doc.apply_update(update) {
warn!("failed to apply update: {:?}", e);
}
}
None
}
DocMessage::Update(update) => doc
.apply_update_from_binary(update)
.and_then(|update| {
if update.is_content_empty() {
return Ok(None);
}

let mut encoder = RawEncoder::default();
update.write(&mut encoder)?;
let update = encoder.into_inner();
debug!("step3 return changed update: {}", update.len());
Ok(Some(update))
})
.map_err(|e| warn!("failed to apply update: {:?}", e))
.ok()
.flatten()
.map(|u| SyncMessage::Doc(DocMessage::Update(u))),
DocMessage::Update(update) => {
let before_state = doc.get_state_vector();
doc.apply_update_from_binary_v1(update)
.and_then(|_| {
// TODO: encode without pending update
let update = doc.encode_state_as_update(&before_state)?;
if update.is_content_empty() {
return Ok(None);
}

let mut encoder = RawEncoder::default();
update.write(&mut encoder)?;
let update = encoder.into_inner();
debug!("step3 return changed update: {}", update.len());
Ok(Some(update))
})
.map_err(|e| warn!("failed to apply update: {:?}", e))
.ok()
.flatten()
.map(|u| SyncMessage::Doc(DocMessage::Update(u)))
}
},
_ => None,
}
Expand Down
24 changes: 12 additions & 12 deletions libs/jwst-core/src/workspaces/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Workspace {

pub fn from_binary(binary: Vec<u8>, workspace_id: &str) -> JwstResult<Self> {
let mut doc = Doc::default();
doc.apply_update_from_binary(binary)?;
doc.apply_update_from_binary_v1(binary)?;
Self::from_doc(doc, workspace_id)
}

Expand Down Expand Up @@ -137,7 +137,7 @@ mod test {
let update = doc.encode_state_as_update_v1(&StateVector::default()).unwrap();

let mut doc = Doc::default();
doc.apply_update_from_binary(update).unwrap();
doc.apply_update_from_binary_v1(update).unwrap();
doc
};

Expand Down Expand Up @@ -240,7 +240,7 @@ mod test {
let data = doc.encode_state_as_update_v1(&StateVector::default()).unwrap();

let mut doc = Doc::default();
doc.apply_update_from_binary(data).unwrap();
doc.apply_update_from_binary_v1(data).unwrap();

assert_eq!(doc.keys(), vec!["test"]);
}
Expand All @@ -260,7 +260,7 @@ mod test {

for _ in 0..=2 {
let mut doc = Doc::default();
doc.apply_update_from_binary(update).unwrap();
doc.apply_update_from_binary_v1(update).unwrap();

update = doc.encode_state_as_update_v1(&StateVector::default()).unwrap();
}
Expand All @@ -280,7 +280,7 @@ mod test {
};
let update1 = {
let mut doc = Doc::default();
doc.apply_update_from_binary(update.clone()).unwrap();
doc.apply_update_from_binary_v1(update.clone()).unwrap();
let mut ws = Workspace::from_doc(doc, "test").unwrap();
{
let mut space = ws.get_space("space").unwrap();
Expand All @@ -293,7 +293,7 @@ mod test {
};
let update2 = {
let mut doc = Doc::default();
doc.apply_update_from_binary(update).unwrap();
doc.apply_update_from_binary_v1(update).unwrap();
let mut ws = Workspace::from_doc(doc, "test").unwrap();
{
let mut space = ws.get_space("space").unwrap();
Expand All @@ -307,8 +307,8 @@ mod test {

{
let mut doc = Doc::default();
doc.apply_update_from_binary(update1.clone()).unwrap();
doc.apply_update_from_binary(update2.clone()).unwrap();
doc.apply_update_from_binary_v1(update1.clone()).unwrap();
doc.apply_update_from_binary_v1(update2.clone()).unwrap();

let mut ws = Workspace::from_doc(doc, "test").unwrap();
let block = {
Expand Down Expand Up @@ -361,7 +361,7 @@ mod test {

let update1 = {
let mut doc = Doc::default();
doc.apply_update_from_binary(update.clone()).unwrap();
doc.apply_update_from_binary_v1(update.clone()).unwrap();
let mut ws = Workspace::from_doc(doc, "test").unwrap();
{
let mut space = ws.get_space("space").unwrap();
Expand All @@ -374,7 +374,7 @@ mod test {
};
let update2 = {
let mut doc = Doc::default();
doc.apply_update_from_binary(update).unwrap();
doc.apply_update_from_binary_v1(update).unwrap();
let mut ws = Workspace::from_doc(doc, "test").unwrap();
{
let mut space = ws.get_space("space").unwrap();
Expand All @@ -388,8 +388,8 @@ mod test {

{
let mut doc = Doc::default();
doc.apply_update_from_binary(update1.clone()).unwrap();
doc.apply_update_from_binary(update2.clone()).unwrap();
doc.apply_update_from_binary_v1(update1.clone()).unwrap();
doc.apply_update_from_binary_v1(update2.clone()).unwrap();

let mut ws = Workspace::from_doc(doc, "test").unwrap();
let block = {
Expand Down
4 changes: 2 additions & 2 deletions libs/jwst-rpc/src/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use jwst_codec::{encode_awareness_as_message, encode_update_as_message, encode_update_with_guid};
use jwst_codec::{encode_awareness_as_message, encode_update_as_message};
use jwst_core::Workspace;
use tokio::sync::{broadcast::Sender, RwLock};

Expand Down Expand Up @@ -53,7 +53,7 @@ pub async fn subscribe(workspace: &Workspace, identifier: String, sender: Broadc
history.len()
);

match encode_update_with_guid(update.to_vec(), workspace_id.clone())
match encode_update_with_guid(update, workspace_id.clone())
.and_then(|update_with_guid| encode_update_as_message(update.to_vec()).map(|u| (update_with_guid, u)))
{
Ok((broadcast_update, sendable_update)) => {
Expand Down
6 changes: 3 additions & 3 deletions libs/jwst-rpc/src/connector/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
thread::JoinHandle as StdJoinHandler,
};

use jwst_codec::{decode_update_with_guid, encode_update_as_message, Doc, DocMessage, SyncMessage, SyncMessageScanner};
use jwst_codec::{encode_update_as_message, Doc, DocMessage, SyncMessage, SyncMessageScanner};
use tokio::{runtime::Runtime, sync::mpsc::channel, task::JoinHandle as TokioJoinHandler};

use super::*;
Expand Down Expand Up @@ -85,9 +85,9 @@ pub fn memory_connector(
}
})
}) {
match decode_update_with_guid(update.clone()) {
match decode_update_with_guid(&update) {
Ok((_, update1)) => {
if let Err(e) = doc.apply_update_from_binary(update1) {
if let Err(e) = doc.apply_update_from_binary_v1(update1) {
error!("failed to decode update1: {}, update: {:?}", e, update);
}
}
Expand Down
6 changes: 3 additions & 3 deletions libs/jwst-rpc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ pub trait RpcContextImpl<'a> {
Ok(data) => match data {
BroadcastType::BroadcastRawContent(update) => {
trace!("receive raw update: {}", update.len());
let mut decoder = RawDecoder::new(update);
let mut decoder = RawDecoder::new(&update);
if let Ok(guid) = decoder.read_var_string() {
match updates.lock().await.entry(guid) {
Entry::Occupied(mut updates) => {
updates.get_mut().push(decoder.drain());
updates.get_mut().push(decoder.drain().into());
}
Entry::Vacant(v) => {
v.insert(vec![decoder.drain()]);
v.insert(vec![decoder.drain().into()]);
}
};
};
Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-rpc/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ mod test {
ws.to_binary().unwrap()
};
// apply update with jwst-codec
doc1.apply_update_from_binary(update).unwrap();
doc1.apply_update_from_binary_v1(&update).unwrap();

// await the task to make sure the doc1 is broadcasted before check doc2
tx_handler.await.unwrap();
Expand Down
1 change: 1 addition & 0 deletions libs/jwst-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use tokio::{
time::{sleep, Duration},
};
pub use utils::{connect_memory_workspace, MinimumServerContext};
use utils::{decode_update_with_guid, encode_update_with_guid};
#[cfg(feature = "webrtc")]
pub use webrtcrs::peer_connection::sdp::session_description::RTCSessionDescription;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-rpc/src/utils/memory_workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub async fn connect_memory_workspace(
let rt = Arc::new(tokio::runtime::Runtime::new().unwrap());

let mut doc = Doc::default();
doc.apply_update_from_binary(init_state.to_vec()).unwrap();
doc.apply_update_from_binary_v1(init_state).unwrap();

let (tx, rx, tx_handler, rx_handler) = memory_connector(rt.clone(), doc.clone());
{
Expand Down
23 changes: 23 additions & 0 deletions libs/jwst-rpc/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,30 @@
mod memory_workspace;
mod server_context;

use std::io::Write;

use jwst_codec::{CrdtReader, CrdtWriter, JwstCodecError, JwstCodecResult, RawDecoder, RawEncoder};
pub use memory_workspace::connect_memory_workspace;
pub use server_context::MinimumServerContext;

use super::*;

pub fn encode_update_with_guid<S: AsRef<str>>(update: &[u8], guid: S) -> JwstCodecResult<Vec<u8>> {
let mut encoder = RawEncoder::default();
encoder.write_var_string(guid)?;
let mut buffer = encoder.into_inner();

buffer
.write_all(&update)

Check warning

Code scanning / clippy

this expression creates a reference which is immediately dereferenced by the compiler Warning

this expression creates a reference which is immediately dereferenced by the compiler
.map_err(|e| JwstCodecError::InvalidWriteBuffer(e.to_string()))?;

Ok(buffer)
}

pub fn decode_update_with_guid(update: &[u8]) -> JwstCodecResult<(String, &[u8])> {
let mut decoder = RawDecoder::new(update);
let guid = decoder.read_var_string()?;
let update = decoder.drain();

Ok((guid, update))
}
6 changes: 3 additions & 3 deletions libs/jwst-storage/src/storage/docs/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl DocDBStorage {
Ok(())
}

async fn update<C>(&self, conn: &C, workspace: &str, guid: &str, blob: Vec<u8>) -> JwstStorageResult<()>
async fn update<C>(&self, conn: &C, workspace: &str, guid: &str, blob: &[u8]) -> JwstStorageResult<()>
where
C: ConnectionTrait,
{
Expand All @@ -206,7 +206,7 @@ impl DocDBStorage {
trace!("update {}bytes to {}", blob.len(), guid);
if let Entry::Occupied(remote) = self.remote.write().await.entry(guid.into()) {
let broadcast = &remote.get();
if broadcast.send(encode_update_as_message(blob)?).is_err() {
if broadcast.send(encode_update_as_message(blob.into())?).is_err() {
// broadcast failures are not fatal errors, only warnings are required
warn!("send {guid} update to pipeline failed");
}
Expand Down Expand Up @@ -365,7 +365,7 @@ impl DocStorage<JwstStorageError> for DocDBStorage {

async fn update_doc_with_guid(&self, workspace_id: String, data: &[u8]) -> JwstStorageResult<()> {
trace!("write_update: {:?}", data);
let mut decoder = RawDecoder::new(data.to_vec());
let mut decoder = RawDecoder::new(data);
let guid = decoder.read_var_string()?;

debug!("write_update: get lock");
Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/storage/docs/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub fn migrate_update(update_records: Vec<<Docs as EntityTrait>::Model>, mut doc
doc.publisher.stop();
for record in update_records {
let id = record.created_at;
if let Err(e) = doc.apply_update_from_binary(record.blob) {
if let Err(e) = doc.apply_update_from_binary_v1(&record.blob) {
warn!("update {} merge failed, skip it: {:?}", id, e);
}
}
Expand Down

0 comments on commit 4737ffa

Please sign in to comment.