From 477276a23dc5ab4e2a5f55db5db12afa0fd6983e Mon Sep 17 00:00:00 2001 From: magine Date: Mon, 16 Oct 2023 12:26:18 +0800 Subject: [PATCH 1/2] Split Transaction from MessagePayload --- core/src/chunk.rs | 2 +- core/src/consts.rs | 6 +- core/src/dht/vnode.rs | 10 +- core/src/ecc/mod.rs | 6 +- core/src/ecc/signers/bip137.rs | 15 +- core/src/ecc/signers/ed25519.rs | 11 +- core/src/ecc/signers/eip191.rs | 16 +- core/src/ecc/signers/secp256k1.rs | 12 +- core/src/message/handlers/connection.rs | 155 +++++----- core/src/message/handlers/custom.rs | 3 +- core/src/message/handlers/dht.rs | 2 +- core/src/message/handlers/mod.rs | 83 +++--- core/src/message/handlers/stabilization.rs | 81 +++--- core/src/message/handlers/storage.rs | 30 +- core/src/message/mod.rs | 3 +- core/src/message/payload.rs | 313 ++++++++++----------- core/src/message/protocols/mod.rs | 1 + core/src/message/protocols/verify.rs | 99 +++++-- core/src/session.rs | 38 +-- core/src/swarm/impls.rs | 27 +- core/src/swarm/mod.rs | 18 +- node/src/backend/extension.rs | 2 +- node/src/backend/service/http_server.rs | 2 +- node/src/backend/service/mod.rs | 5 +- node/src/backend/service/text.rs | 2 +- node/src/backend/service/utils.rs | 2 +- node/src/backend/types.rs | 2 +- node/src/browser/client.rs | 22 +- node/src/jsonrpc/server.rs | 5 +- node/src/native/endpoint/mod.rs | 2 +- node/src/processor.rs | 11 +- node/src/tests/wasm/processor.rs | 4 +- rpc/src/jsonrpc_client/client.rs | 2 +- 33 files changed, 516 insertions(+), 476 deletions(-) diff --git a/core/src/chunk.rs b/core/src/chunk.rs index 6ff1954e5..fbc22694b 100644 --- a/core/src/chunk.rs +++ b/core/src/chunk.rs @@ -64,7 +64,7 @@ pub struct ChunkMeta { /// Created time pub ts_ms: u128, /// Time to live - pub ttl_ms: usize, + pub ttl_ms: u64, } impl Default for ChunkMeta { diff --git a/core/src/consts.rs b/core/src/consts.rs index 18cff7d33..bf052a676 100644 --- a/core/src/consts.rs +++ b/core/src/consts.rs @@ -1,10 +1,10 @@ //! Constant variables. /// /// default ttl in ms -pub const DEFAULT_TTL_MS: usize = 300 * 1000; -pub const MAX_TTL_MS: usize = DEFAULT_TTL_MS * 10; +pub const DEFAULT_TTL_MS: u64 = 300 * 1000; +pub const MAX_TTL_MS: u64 = DEFAULT_TTL_MS * 10; pub const TS_OFFSET_TOLERANCE_MS: u128 = 3000; -pub const DEFAULT_SESSION_TTL_MS: usize = 30 * 24 * 3600 * 1000; +pub const DEFAULT_SESSION_TTL_MS: u64 = 30 * 24 * 3600 * 1000; pub const TRANSPORT_MTU: usize = 60000; pub const TRANSPORT_MAX_SIZE: usize = TRANSPORT_MTU * 16; pub const VNODE_DATA_MAX_LEN: usize = 1024; diff --git a/core/src/dht/vnode.rs b/core/src/dht/vnode.rs index 2ad161224..2dcc785ee 100644 --- a/core/src/dht/vnode.rs +++ b/core/src/dht/vnode.rs @@ -3,7 +3,6 @@ use std::cmp::max; use std::str::FromStr; use num_bigint::BigUint; -use serde::de::DeserializeOwned; use serde::Deserialize; use serde::Serialize; @@ -16,6 +15,7 @@ use crate::error::Result; use crate::message::Encoded; use crate::message::Encoder; use crate::message::MessagePayload; +use crate::message::MessageVerificationExt; /// VNode Types #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -108,12 +108,10 @@ impl VNodeOperation { } } -impl TryFrom> for VirtualNode -where T: Serialize + DeserializeOwned -{ +impl TryFrom for VirtualNode { type Error = Error; - fn try_from(msg: MessagePayload) -> Result { - let did = BigUint::from(msg.addr) + BigUint::from(1u16); + fn try_from(msg: MessagePayload) -> Result { + let did = BigUint::from(msg.signer()) + BigUint::from(1u16); let data = msg.encode()?; Ok(Self { did: did.into(), diff --git a/core/src/ecc/mod.rs b/core/src/ecc/mod.rs index c97d6c7b2..de2e8659b 100644 --- a/core/src/ecc/mod.rs +++ b/core/src/ecc/mod.rs @@ -272,10 +272,10 @@ impl PublicKey { } /// Recover PublicKey from RawMessage using signature. -pub fn recover(message: &str, signature: S) -> Result +pub fn recover(message: &[u8], signature: S) -> Result where S: AsRef<[u8]> { let sig_bytes: SigBytes = signature.as_ref().try_into()?; - let message_hash: [u8; 32] = keccak256(message.as_bytes()); + let message_hash: [u8; 32] = keccak256(message); recover_hash(&message_hash, &sig_bytes) } @@ -353,7 +353,7 @@ pub mod tests { fn test_recover() { let key = SecretKey::random(); let pubkey1 = key.pubkey(); - let pubkey2 = recover("hello", key.sign("hello")).unwrap(); + let pubkey2 = recover("hello".as_bytes(), key.sign("hello")).unwrap(); assert_eq!(pubkey1, pubkey2); } diff --git a/core/src/ecc/signers/bip137.rs b/core/src/ecc/signers/bip137.rs index 568a941da..e805a00b2 100644 --- a/core/src/ecc/signers/bip137.rs +++ b/core/src/ecc/signers/bip137.rs @@ -9,7 +9,7 @@ use crate::ecc::PublicKeyAddress; use crate::error::Result; /// recover pubkey according to signature. -pub fn recover(msg: &str, sig: impl AsRef<[u8]>) -> Result { +pub fn recover(msg: &[u8], sig: impl AsRef<[u8]>) -> Result { let mut sig = sig.as_ref().to_vec(); sig.rotate_left(1); let sig = sig.as_mut_slice(); @@ -20,7 +20,7 @@ pub fn recover(msg: &str, sig: impl AsRef<[u8]>) -> Result { } /// verify message signed by Ethereum address. -pub fn verify(msg: &str, address: &PublicKeyAddress, sig: impl AsRef<[u8]>) -> bool { +pub fn verify(msg: &[u8], address: &PublicKeyAddress, sig: impl AsRef<[u8]>) -> bool { match recover(msg, sig.as_ref()) { Ok(recover_pk) => { if recover_pk.address() == *address { @@ -35,7 +35,7 @@ pub fn verify(msg: &str, address: &PublicKeyAddress, sig: impl AsRef<[u8]>) -> b } Err(e) => { tracing::debug!( - "failed to recover pubkey: {:?}\nmsg: {}\nsig:{:?}", + "failed to recover pubkey: {:?}\nmsg: {:?}\nsig:{:?}", e, msg, sig.as_ref(), @@ -67,14 +67,13 @@ fn varint_buf_num(n: u64) -> Vec { } } -pub fn magic_hash(msg: &str) -> [u8; 32] { +pub fn magic_hash(msg: &[u8]) -> [u8; 32] { let magic_bytes = "Bitcoin Signed Message:\n".as_bytes(); - let msg_bytes = msg.as_bytes(); let mut buf = Vec::new(); buf.extend_from_slice(varint_buf_num(magic_bytes.len() as u64).as_slice()); buf.extend_from_slice(magic_bytes); - buf.extend_from_slice(varint_buf_num(msg_bytes.len() as u64).as_slice()); - buf.extend_from_slice(msg_bytes); + buf.extend_from_slice(varint_buf_num(msg.len() as u64).as_slice()); + buf.extend_from_slice(msg); let hash = Sha256::digest(Sha256::digest(&buf)); hash.into() } @@ -100,7 +99,7 @@ mod test { ]; assert_eq!(sig.len(), 65); - let pk = self::recover(msg, sig).unwrap(); + let pk = self::recover(msg.as_bytes(), sig).unwrap(); assert_eq!(pk, pubkey); assert_eq!(pk.address(), pubkey.address()); } diff --git a/core/src/ecc/signers/ed25519.rs b/core/src/ecc/signers/ed25519.rs index 54114a7b9..bea05c535 100644 --- a/core/src/ecc/signers/ed25519.rs +++ b/core/src/ecc/signers/ed25519.rs @@ -6,7 +6,7 @@ use crate::ecc::PublicKeyAddress; /// ref pub fn verify( - msg: &str, + msg: &[u8], address: &PublicKeyAddress, sig: impl AsRef<[u8]>, pubkey: PublicKey, @@ -22,7 +22,7 @@ pub fn verify( TryInto::::try_into(pubkey), ed25519_dalek::Signature::from_bytes(&sig_data), ) { - match p.verify(msg.as_bytes(), &s) { + match p.verify(msg, &s) { Ok(()) => true, Err(_) => false, } @@ -54,6 +54,11 @@ mod test { PublicKey::try_from_b58t("9z1ZTaGocNSAu3DSqGKR6Dqt214X4dXucVd6C53EgqBK").unwrap(); let sig_b58 = "2V1AR5byk4a4CkVmFRWU1TVs3ns2CGkuq6xgGju1huGQGq5hGkiHUDjEaJJaL2txfqCSGnQW55jUJpcjKFkZEKq"; let sig: Vec = base58::FromBase58::from_base58(sig_b58).unwrap(); - assert!(self::verify(msg, &signer.address(), sig.as_slice(), signer)) + assert!(self::verify( + msg.as_bytes(), + &signer.address(), + sig.as_slice(), + signer + )) } } diff --git a/core/src/ecc/signers/eip191.rs b/core/src/ecc/signers/eip191.rs index eb16cb9be..959470d31 100644 --- a/core/src/ecc/signers/eip191.rs +++ b/core/src/ecc/signers/eip191.rs @@ -8,7 +8,7 @@ use crate::ecc::SecretKey; use crate::error::Result; /// sign function passing raw message parameter. -pub fn sign_raw(sec: SecretKey, msg: &str) -> [u8; 65] { +pub fn sign_raw(sec: SecretKey, msg: &[u8]) -> [u8; 65] { sign(sec, &hash(msg)) } @@ -20,14 +20,14 @@ pub fn sign(sec: SecretKey, hash: &[u8; 32]) -> [u8; 65] { } /// \x19Ethereum Signed Message\n is used for PersonalSign, which can encode by send `personalSign` rpc call. -pub fn hash(msg: &str) -> [u8; 32] { +pub fn hash(msg: &[u8]) -> [u8; 32] { let mut prefix_msg = format!("\x19Ethereum Signed Message:\n{}", msg.len()).into_bytes(); - prefix_msg.extend_from_slice(msg.as_bytes()); + prefix_msg.extend_from_slice(msg); keccak256(&prefix_msg) } /// recover pubkey according to signature. -pub fn recover(msg: &str, sig: impl AsRef<[u8]>) -> Result { +pub fn recover(msg: &[u8], sig: impl AsRef<[u8]>) -> Result { let sig_byte: [u8; 65] = sig.as_ref().try_into()?; let hash = hash(msg); let mut sig712 = sig_byte; @@ -36,7 +36,7 @@ pub fn recover(msg: &str, sig: impl AsRef<[u8]>) -> Result { } /// verify message signed by Ethereum address. -pub fn verify(msg: &str, address: &PublicKeyAddress, sig: impl AsRef<[u8]>) -> bool { +pub fn verify(msg: &[u8], address: &PublicKeyAddress, sig: impl AsRef<[u8]>) -> bool { if let Ok(p) = recover(msg, sig) { p.address() == *address } else { @@ -63,11 +63,11 @@ mod test { // window.ethereum.request({method: "personal_sign", params: ["test", "0x11E807fcc88dD319270493fB2e822e388Fe36ab0"]}) let metamask_sig = Vec::from_hex("724fc31d9272b34d8406e2e3a12a182e72510b008de6cc44684577e31e20d9626fb760d6a0badd79a6cf4cd56b2fc0fbd60c438b809aa7d29bfb598c13e7b50e1b").unwrap(); let msg = "test"; - let h = self::hash(msg); + let h = self::hash(msg.as_bytes()); let sig = self::sign(key, &h); assert_eq!(metamask_sig.as_slice(), sig); - let pubkey = self::recover(msg, sig).unwrap(); + let pubkey = self::recover(msg.as_bytes(), sig).unwrap(); assert_eq!(pubkey.address(), address); - assert!(self::verify(msg, &address, sig)); + assert!(self::verify(msg.as_bytes(), &address, sig)); } } diff --git a/core/src/ecc/signers/secp256k1.rs b/core/src/ecc/signers/secp256k1.rs index efb1c15ba..25b2f2b77 100644 --- a/core/src/ecc/signers/secp256k1.rs +++ b/core/src/ecc/signers/secp256k1.rs @@ -7,7 +7,7 @@ use crate::ecc::SecretKey; use crate::error::Result; /// sign function passing raw message parameter. -pub fn sign_raw(sec: SecretKey, msg: &str) -> [u8; 65] { +pub fn sign_raw(sec: SecretKey, msg: &[u8]) -> [u8; 65] { sign(sec, &hash(msg)) } @@ -17,18 +17,18 @@ pub fn sign(sec: SecretKey, hash: &[u8; 32]) -> [u8; 65] { } /// generate hash data from message. -pub fn hash(msg: &str) -> [u8; 32] { - keccak256(msg.as_bytes()) +pub fn hash(msg: &[u8]) -> [u8; 32] { + keccak256(msg) } /// recover public key from message and signature. -pub fn recover(msg: &str, sig: impl AsRef<[u8]>) -> Result { +pub fn recover(msg: &[u8], sig: impl AsRef<[u8]>) -> Result { let sig_byte: [u8; 65] = sig.as_ref().try_into()?; crate::ecc::recover(msg, sig_byte) } /// verify signature with message and address. -pub fn verify(msg: &str, address: &PublicKeyAddress, sig: impl AsRef<[u8]>) -> bool { +pub fn verify(msg: &[u8], address: &PublicKeyAddress, sig: impl AsRef<[u8]>) -> bool { if let Ok(p) = recover(msg, sig) { p.address() == *address } else { @@ -48,7 +48,7 @@ mod test { .unwrap(); let msg = "hello"; - let h = self::hash(msg); + let h = self::hash(msg.as_bytes()); let sig = self::sign(key, &h); assert_eq!(sig, key.sign(msg)); } diff --git a/core/src/message/handlers/connection.rs b/core/src/message/handlers/connection.rs index 8f578edf7..d626cc192 100644 --- a/core/src/message/handlers/connection.rs +++ b/core/src/message/handlers/connection.rs @@ -32,7 +32,7 @@ use crate::message::MessagePayload; impl HandleMsg for MessageHandler { async fn handle( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &QueryForTopoInfoSend, ) -> Result> { let info: TopoInfo = TopoInfo::try_from(self.dht.deref())?; @@ -53,7 +53,7 @@ impl HandleMsg for MessageHandler { impl HandleMsg for MessageHandler { async fn handle( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &QueryForTopoInfoReport, ) -> Result> { match msg.then { @@ -76,7 +76,7 @@ impl HandleMsg for MessageHandler { impl HandleMsg for MessageHandler { async fn handle( &self, - _ctx: &MessagePayload, + _ctx: &MessagePayload, msg: &LeaveDHT, ) -> Result> { Ok(vec![MessageHandlerEvent::Disconnect(msg.did)]) @@ -88,7 +88,7 @@ impl HandleMsg for MessageHandler { impl HandleMsg for MessageHandler { async fn handle( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &JoinDHT, ) -> Result> { // here is two situation. @@ -105,7 +105,7 @@ impl HandleMsg for MessageHandler { impl HandleMsg for MessageHandler { async fn handle( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &ConnectNodeSend, ) -> Result> { if self.dht.did != ctx.relay.destination { @@ -124,7 +124,7 @@ impl HandleMsg for MessageHandler { impl HandleMsg for MessageHandler { async fn handle( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &ConnectNodeReport, ) -> Result> { if self.dht.did != ctx.relay.destination { @@ -143,7 +143,7 @@ impl HandleMsg for MessageHandler { impl HandleMsg for MessageHandler { async fn handle( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &FindSuccessorSend, ) -> Result> { match self.dht.find_successor(msg.did)? { @@ -183,7 +183,7 @@ impl HandleMsg for MessageHandler { impl HandleMsg for MessageHandler { async fn handle( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &FindSuccessorReport, ) -> Result> { if self.dht.did != ctx.relay.destination { @@ -216,6 +216,7 @@ pub mod tests { use crate::ecc::tests::gen_ordered_keys; use crate::ecc::SecretKey; use crate::message::handlers::tests::assert_no_more_msg; + use crate::message::MessageVerificationExt; use crate::swarm::Swarm; use crate::tests::default::prepare_node; use crate::tests::manually_establish_connection; @@ -379,10 +380,10 @@ pub mod tests { // so node2 pick node3 as node3's successor // let ev_3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev_3.addr, node2.did()); + assert_eq!(ev_3.signer(), node2.did()); assert_eq!(ev_3.relay.path, vec![node2.did()]); assert!(matches!( - ev_3.data, + ev_3.transaction.data()?, Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node3.did() )); // dht3 won't set did3 as successor @@ -391,11 +392,11 @@ pub mod tests { // 3->2 FindSuccessorReport // node3 report node2 as node2's successor to node2 let ev_2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev_2.addr, node3.did()); + assert_eq!(ev_2.signer(), node3.did()); assert_eq!(ev_2.relay.path, vec![node3.did()]); // node3 is only aware of node2, so it respond node2 assert!(matches!( - ev_2.data, + ev_2.transaction.data()?, Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did() )); // dht2 won't set did2 as successor @@ -422,20 +423,20 @@ pub mod tests { // 3->1->2 FindSuccessorSend let ev_2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev_2.addr, node1.did()); + assert_eq!(ev_2.signer(), node1.did()); assert_eq!(ev_2.relay.path, vec![node3.did(), node1.did()]); assert!(matches!( - ev_2.data, + ev_2.transaction.data()?, Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node3.did() )); // 3->1 FindSuccessorReport // node3 report node1 as node1's successor to node1 let ev_1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev_1.addr, node3.did()); + assert_eq!(ev_1.signer(), node3.did()); assert_eq!(ev_1.relay.path, vec![node3.did()]); assert!(matches!( - ev_1.data, + ev_1.transaction.data()?, Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node1.did() )); // dht1 won't set did1 as successor @@ -443,15 +444,15 @@ pub mod tests { // 2->1 FindSuccessorReport let ev_1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev_1.addr, node2.did()); + assert_eq!(ev_1.signer(), node2.did()); assert_eq!(ev_1.relay.path, vec![node2.did()]); // 2->1->3 FindSuccessorReport let ev_3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev_3.addr, node1.did()); + assert_eq!(ev_3.signer(), node1.did()); assert_eq!(ev_3.relay.path, vec![node2.did(), node1.did()]); assert!(matches!( - ev_3.data, + ev_3.transaction.data()?, Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node3.did() )); // dht3 won't set did3 as successor @@ -525,21 +526,21 @@ pub mod tests { // so node2 pick node1 to find_successor. // let ev_1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev_1.addr, node2.did()); + assert_eq!(ev_1.signer(), node2.did()); assert_eq!(ev_1.relay.path, vec![node3.did(), node2.did()]); assert!(matches!( - ev_1.data, + ev_1.transaction.data()?, Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node3.did() )); // 3->2 FindSuccessorReport // node3 report node2 as node2's successor to node2 let ev_2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev_2.addr, node3.did()); + assert_eq!(ev_2.signer(), node3.did()); assert_eq!(ev_2.relay.path, vec![node3.did()]); // node3 is only aware of node2, so it respond node2 assert!(matches!( - ev_2.data, + ev_2.transaction.data()?, Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did() )); // dht2 won't set did2 as successor @@ -548,21 +549,21 @@ pub mod tests { // 1->2 FindSuccessorReport // node1 report node2 as node3's successor to node2 let ev_2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev_2.addr, node1.did()); + assert_eq!(ev_2.signer(), node1.did()); assert_eq!(ev_2.relay.path, vec![node1.did()]); // node1 is only aware of node2, so it respond node2 assert!(matches!( - ev_2.data, + ev_2.transaction.data()?, Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did() )); // 1->2->3 FindSuccessorReport // node2 relay report to node3 let ev_3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev_3.addr, node2.did()); + assert_eq!(ev_3.signer(), node2.did()); assert_eq!(ev_3.relay.path, vec![node1.did(), node2.did()]); assert!(matches!( - ev_3.data, + ev_3.transaction.data()?, Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did() )); @@ -587,20 +588,20 @@ pub mod tests { // 1->3->2 FindSuccessorSend let ev_2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev_2.addr, node3.did()); + assert_eq!(ev_2.signer(), node3.did()); assert_eq!(ev_2.relay.path, vec![node1.did(), node3.did()]); assert!(matches!( - ev_2.data, + ev_2.transaction.data()?, Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node1.did() )); // 1->3 FindSuccessorReport // node1 report node3 as node3's successor to node1 let ev_3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev_3.addr, node1.did()); + assert_eq!(ev_3.signer(), node1.did()); assert_eq!(ev_3.relay.path, vec![node1.did()]); assert!(matches!( - ev_3.data, + ev_3.transaction.data()?, Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node3.did() )); // dht3 won't set did3 as successor @@ -608,15 +609,15 @@ pub mod tests { // 2->3 FindSuccessorReport let ev_3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev_3.addr, node2.did()); + assert_eq!(ev_3.signer(), node2.did()); assert_eq!(ev_3.relay.path, vec![node2.did()]); // 2->3->1 FindSuccessorReport let ev_1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev_1.addr, node3.did()); + assert_eq!(ev_1.signer(), node3.did()); assert_eq!(ev_1.relay.path, vec![node2.did(), node3.did()]); assert!(matches!( - ev_1.data, + ev_1.transaction.data()?, Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node1.did() )); // dht1 won't set did1 as successor @@ -651,28 +652,32 @@ pub mod tests { ) -> Result<()> { // 1 JoinDHT let ev_1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev_1.addr, node1.did()); + assert_eq!(ev_1.signer(), node1.did()); assert_eq!(ev_1.relay.path, vec![node1.did()]); - assert!(matches!(ev_1.data, Message::JoinDHT(JoinDHT{did, ..}) if did == node2.did())); + assert!( + matches!(ev_1.transaction.data()?, Message::JoinDHT(JoinDHT{did, ..}) if did == node2.did()) + ); // 2 JoinDHT let ev_2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev_2.addr, node2.did()); + assert_eq!(ev_2.signer(), node2.did()); assert_eq!(ev_2.relay.path, vec![node2.did()]); - assert!(matches!(ev_2.data, Message::JoinDHT(JoinDHT{did, ..}) if did == node1.did())); + assert!( + matches!(ev_2.transaction.data()?, Message::JoinDHT(JoinDHT{did, ..}) if did == node1.did()) + ); // 1->2 FindSuccessorSend let ev_1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev_1.addr, node2.did()); + assert_eq!(ev_1.signer(), node2.did()); assert_eq!(ev_1.relay.path, vec![node2.did()]); assert!(matches!( - ev_1.data, + ev_1.transaction.data()?, Message::FindSuccessorSend(FindSuccessorSend{did, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect), strict: false}) if did == node2.did() )); // 2->1 FindSuccessorSend let ev_2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev_2.addr, node1.did()); + assert_eq!(ev_2.signer(), node1.did()); assert_eq!(ev_2.relay.path, vec![node1.did()]); assert!(matches!( - ev_2.data, + ev_2.transaction.data()?, Message::FindSuccessorSend(FindSuccessorSend{did, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect), strict: false}) if did == node1.did() )); @@ -689,11 +694,11 @@ pub mod tests { // 2->1 FindSuccessorReport // node2 report node1 as node1's successor to node1 let ev_1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev_1.addr, node2.did()); + assert_eq!(ev_1.signer(), node2.did()); assert_eq!(ev_1.relay.path, vec![node2.did()]); // node2 is only aware of node1, so it respond node1 assert!(matches!( - ev_1.data, + ev_1.transaction.data()?, Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node1.did() )); // dht1 won't set dhd1 as successor @@ -702,11 +707,11 @@ pub mod tests { // 1->2 FindSuccessorReport // node1 report node2 as node2's successor to node2 let ev_2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev_2.addr, node1.did()); + assert_eq!(ev_2.signer(), node1.did()); assert_eq!(ev_2.relay.path, vec![node1.did()]); // node1 is only aware of node2, so it respond node2 assert!(matches!( - ev_2.data, + ev_2.transaction.data()?, Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node2.did() )); // dht2 won't set did2 as successor @@ -730,57 +735,73 @@ pub mod tests { // node1 send msg to node2 let ev2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev2.addr, node1.did()); + assert_eq!(ev2.signer(), node1.did()); assert_eq!(ev2.relay.path, vec![node1.did()]); - assert!(matches!(ev2.data, Message::ConnectNodeSend(_))); + assert!(matches!( + ev2.transaction.data()?, + Message::ConnectNodeSend(_) + )); // node2 relay msg to node3 let ev3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev3.addr, node2.did()); + assert_eq!(ev3.signer(), node2.did()); assert_eq!(ev3.relay.path, vec![node1.did(), node2.did()]); - assert!(matches!(ev3.data, Message::ConnectNodeSend(_))); + assert!(matches!( + ev3.transaction.data()?, + Message::ConnectNodeSend(_) + )); // node3 send report to node2 let ev2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev2.addr, node3.did()); + assert_eq!(ev2.signer(), node3.did()); assert_eq!(ev2.relay.path, vec![node3.did()]); - assert!(matches!(ev2.data, Message::ConnectNodeReport(_))); + assert!(matches!( + ev2.transaction.data()?, + Message::ConnectNodeReport(_) + )); // node 2 relay report to node1 let ev1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev1.addr, node2.did()); + assert_eq!(ev1.signer(), node2.did()); assert_eq!(ev1.relay.path, vec![node3.did(), node2.did()]); - assert!(matches!(ev1.data, Message::ConnectNodeReport(_))); + assert!(matches!( + ev1.transaction.data()?, + Message::ConnectNodeReport(_) + )); // The following are communications after successful connection // 1 JoinDHT let ev_1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev_1.addr, node1.did()); + assert_eq!(ev_1.signer(), node1.did()); assert_eq!(ev_1.relay.path, vec![node1.did()]); - assert!(matches!(ev_1.data, Message::JoinDHT(JoinDHT{did, ..}) if did == node3.did())); + assert!( + matches!(ev_1.transaction.data()?, Message::JoinDHT(JoinDHT{did, ..}) if did == node3.did()) + ); // 3 JoinDHT let ev_3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev_3.addr, node3.did()); + assert_eq!(ev_3.signer(), node3.did()); assert_eq!(ev_3.relay.path, vec![node3.did()]); - assert!(matches!(ev_3.data, Message::JoinDHT(JoinDHT{did, ..}) if did == node1.did())); + assert!( + matches!(ev_3.transaction.data()?, Message::JoinDHT(JoinDHT{did, ..}) if did == node1.did()) + ); // 3->1 FindSuccessorSend let ev_1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev_1.addr, node3.did()); + assert_eq!(ev_1.signer(), node3.did()); assert_eq!(ev_1.relay.path, vec![node3.did()]); assert!(matches!( - ev_1.data, + ev_1.transaction.data()?, Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node3.did() )); // 1->3 FindSuccessorSend let ev_3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev_3.addr, node1.did()); + assert_eq!(ev_3.signer(), node1.did()); assert_eq!(ev_3.relay.path, vec![node1.did()]); assert!(matches!( - ev_3.data, + ev_3.transaction.data()?, Message::FindSuccessorSend(FindSuccessorSend{did, strict: false, then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect)}) if did == node1.did() )); @@ -939,7 +960,9 @@ pub mod tests { node1.disconnect(node2.did()).await?; let ev1 = node1.listen_once().await.unwrap().0; - assert!(matches!(ev1.data, Message::LeaveDHT(LeaveDHT{did}) if did == node2.did())); + assert!( + matches!(ev1.transaction.data()?, Message::LeaveDHT(LeaveDHT{did}) if did == node2.did()) + ); #[cfg(not(feature = "wasm"))] node2.disconnect(node1.did()).await.unwrap(); @@ -958,8 +981,10 @@ pub mod tests { } } let ev2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev2.addr, node2.did()); - assert!(matches!(ev2.data, Message::LeaveDHT(LeaveDHT{did}) if did == node1.did())); + assert_eq!(ev2.signer(), node2.did()); + assert!( + matches!(ev2.transaction.data()?, Message::LeaveDHT(LeaveDHT{did}) if did == node1.did()) + ); assert_no_more_msg(&node1, &node2, &node3).await; diff --git a/core/src/message/handlers/custom.rs b/core/src/message/handlers/custom.rs index 5c7bee81f..08acfafad 100644 --- a/core/src/message/handlers/custom.rs +++ b/core/src/message/handlers/custom.rs @@ -2,7 +2,6 @@ use async_trait::async_trait; use crate::error::Result; use crate::message::types::CustomMessage; -use crate::message::types::Message; use crate::message::HandleMsg; use crate::message::MessageHandler; use crate::message::MessageHandlerEvent; @@ -13,7 +12,7 @@ use crate::message::MessagePayload; impl HandleMsg for MessageHandler { async fn handle( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, _: &CustomMessage, ) -> Result> { if self.dht.did != ctx.relay.destination { diff --git a/core/src/message/handlers/dht.rs b/core/src/message/handlers/dht.rs index d53bd9b39..6acb23210 100644 --- a/core/src/message/handlers/dht.rs +++ b/core/src/message/handlers/dht.rs @@ -60,7 +60,7 @@ macro_rules! handle_multi_actions { #[cfg_attr(not(feature = "wasm"), async_recursion)] pub async fn handle_dht_events( act: &PeerRingAction, - ctx: &MessagePayload, + ctx: &MessagePayload, ) -> Result> { match act { PeerRingAction::None => Ok(vec![]), diff --git a/core/src/message/handlers/mod.rs b/core/src/message/handlers/mod.rs index bad80170c..f695699df 100644 --- a/core/src/message/handlers/mod.rs +++ b/core/src/message/handlers/mod.rs @@ -37,9 +37,6 @@ pub mod storage; /// Operator and Handler for Subring pub mod subring; -/// Type alias for message payload. -pub type Payload = MessagePayload; - /// Trait of message callback. #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] @@ -47,11 +44,11 @@ pub trait MessageCallback { /// Message handler for custom message async fn custom_message( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &CustomMessage, ) -> Vec; /// Message handler for builtin message - async fn builtin_message(&self, ctx: &MessagePayload) -> Vec; + async fn builtin_message(&self, ctx: &MessagePayload) -> Vec; } /// Trait of message validator. @@ -59,7 +56,7 @@ pub trait MessageCallback { #[cfg_attr(not(feature = "wasm"), async_trait)] pub trait MessageValidator { /// Externality validator - async fn validate(&self, ctx: &MessagePayload) -> Option; + async fn validate(&self, ctx: &MessagePayload) -> Option; } /// Boxed Callback, for non-wasm, it should be Sized, Send and Sync. @@ -92,7 +89,7 @@ pub enum MessageHandlerEvent { /// Instructs the swarm to answer an offer inside payload by given /// sender's Did and Message. - AnswerOffer(Payload, ConnectNodeSend), + AnswerOffer(MessagePayload, ConnectNodeSend), /// Instructs the swarm to accept an answer inside payload by given /// sender's Did and Message. @@ -100,10 +97,10 @@ pub enum MessageHandlerEvent { /// Tell swarm to forward the payload to destination by given /// Payload and optional next hop. - ForwardPayload(Payload, Option), + ForwardPayload(MessagePayload, Option), /// Instructs the swarm to notify the dht about new peer. - JoinDHT(Payload, Did), + JoinDHT(MessagePayload, Did), /// Instructs the swarm to send a direct message to a peer. SendDirectMessage(Message, Did), @@ -112,10 +109,10 @@ pub enum MessageHandlerEvent { SendMessage(Message, Did), /// Instructs the swarm to send a message as a response to the received message. - SendReportMessage(Payload, Message), + SendReportMessage(MessagePayload, Message), /// Instructs the swarm to send a message to a peer via the dht network with a specific next hop. - ResetDestination(Payload, Did), + ResetDestination(MessagePayload, Did), /// Instructs the swarm to store vnode. StorageStore(VirtualNode), @@ -138,11 +135,7 @@ pub struct MessageHandler { #[cfg_attr(not(feature = "wasm"), async_trait)] pub trait HandleMsg { /// Message handler. - async fn handle( - &self, - ctx: &MessagePayload, - msg: &T, - ) -> Result>; + async fn handle(&self, ctx: &MessagePayload, msg: &T) -> Result>; } impl MessageHandler { @@ -160,19 +153,26 @@ impl MessageHandler { } /// Invoke callback, which will be call after builtin handler. - async fn invoke_callback(&self, payload: &MessagePayload) -> Vec { + async fn invoke_callback( + &self, + payload: &MessagePayload, + message: &Message, + ) -> Vec { if let Some(ref cb) = *self.callback { - match payload.data { + match message { Message::CustomMessage(ref msg) => { - if self.dht.did == payload.relay.destination { - tracing::debug!("INVOKE CUSTOM MESSAGE CALLBACK {}", &payload.tx_id); + if self.dht.did == payload.transaction.destination { + tracing::debug!( + "INVOKE CUSTOM MESSAGE CALLBACK {}", + &payload.transaction.tx_id + ); return cb.custom_message(payload, msg).await; } } _ => return cb.builtin_message(payload).await, }; - } else if let Message::CustomMessage(ref msg) = payload.data { - if self.dht.did == payload.relay.destination { + } else if let Message::CustomMessage(ref msg) = message { + if self.dht.did == payload.transaction.destination { tracing::warn!("No callback registered, skip invoke_callback of {:?}", msg); } } @@ -180,7 +180,7 @@ impl MessageHandler { } /// Validate message. - async fn validate(&self, payload: &MessagePayload) -> Result<()> { + async fn validate(&self, payload: &MessagePayload) -> Result<()> { if let Some(ref v) = *self.validator { v.validate(payload) .await @@ -195,17 +195,22 @@ impl MessageHandler { #[cfg_attr(not(feature = "wasm"), async_recursion)] pub async fn handle_message( &self, - payload: &MessagePayload, + payload: &MessagePayload, ) -> Result> { + self.validate(payload).await?; + let message: Message = payload.transaction.data()?; + #[cfg(test)] { - println!("{} got msg {}", self.dht.did, &payload.data); + println!("{} got msg {}", self.dht.did, &message); } - tracing::debug!("START HANDLE MESSAGE: {} {}", &payload.tx_id, &payload.data); + tracing::debug!( + "START HANDLE MESSAGE: {} {}", + &payload.transaction.tx_id, + &message + ); - self.validate(payload).await?; - - let mut events = match &payload.data { + let mut events = match &message { Message::JoinDHT(ref msg) => self.handle(payload, msg).await, Message::LeaveDHT(ref msg) => self.handle(payload, msg).await, Message::ConnectNodeSend(ref msg) => self.handle(payload, msg).await, @@ -223,11 +228,11 @@ impl MessageHandler { Message::QueryForTopoInfoReport(ref msg) => self.handle(payload, msg).await, }?; - tracing::debug!("INVOKE CALLBACK {}", &payload.tx_id); + tracing::debug!("INVOKE CALLBACK {}", &payload.transaction.tx_id); - events.extend(self.invoke_callback(payload).await); + events.extend(self.invoke_callback(payload, &message).await); - tracing::debug!("FINISH HANDLE MESSAGE {}", &payload.tx_id); + tracing::debug!("FINISH HANDLE MESSAGE {}", &payload.transaction.tx_id); Ok(events) } } @@ -242,6 +247,7 @@ pub mod tests { use super::*; use crate::dht::Did; use crate::ecc::SecretKey; + use crate::message::MessageVerificationExt; use crate::message::PayloadSender; use crate::swarm::Swarm; use crate::tests::default::prepare_node_with_callback; @@ -262,22 +268,19 @@ pub mod tests { impl MessageCallback for MessageCallbackInstance { async fn custom_message( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &CustomMessage, ) -> Vec { self.handler_messages .lock() .await - .push((ctx.addr, msg.0.clone())); - println!("{:?}, {:?}, {:?}", ctx, ctx.addr, msg); + .push((ctx.signer(), msg.0.clone())); + println!("{:?}, {:?}, {:?}", ctx, ctx.signer(), msg); vec![] } - async fn builtin_message( - &self, - ctx: &MessagePayload, - ) -> Vec { - println!("{:?}, {:?}", ctx, ctx.addr); + async fn builtin_message(&self, ctx: &MessagePayload) -> Vec { + println!("{:?}, {:?}", ctx, ctx.signer()); vec![] } } diff --git a/core/src/message/handlers/stabilization.rs b/core/src/message/handlers/stabilization.rs index aa4feb776..906f75afc 100644 --- a/core/src/message/handlers/stabilization.rs +++ b/core/src/message/handlers/stabilization.rs @@ -19,7 +19,7 @@ use crate::message::MessagePayload; impl HandleMsg for MessageHandler { async fn handle( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &NotifyPredecessorSend, ) -> Result> { let predecessor = { *self.dht.lock_predecessor()? }; @@ -43,7 +43,7 @@ impl HandleMsg for MessageHandler { impl HandleMsg for MessageHandler { async fn handle( &self, - _ctx: &MessagePayload, + _ctx: &MessagePayload, msg: &NotifyPredecessorReport, ) -> Result> { let mut events = vec![MessageHandlerEvent::Connect(msg.did)]; @@ -77,6 +77,7 @@ mod test { use crate::message::handlers::connection::tests::test_only_two_nodes_establish_connection; use crate::message::handlers::tests::assert_no_more_msg; use crate::message::handlers::tests::wait_for_msgs; + use crate::message::MessageVerificationExt; use crate::swarm::Swarm; use crate::tests::default::prepare_node; use crate::tests::manually_establish_connection; @@ -170,46 +171,46 @@ mod test { // node2 notify node1 let ev1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev1.addr, node2.did()); + assert_eq!(ev1.signer(), node2.did()); assert_eq!(ev1.relay.path, vec![node2.did()]); assert!(matches!( - ev1.data, + ev1.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node2.did() )); // node1 notify node2 let ev2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev2.addr, node1.did()); + assert_eq!(ev2.signer(), node1.did()); assert_eq!(ev2.relay.path, vec![node1.did()]); assert!(matches!( - ev2.data, + ev2.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node1.did() )); // node2 notify node3 let ev3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev3.addr, node2.did()); + assert_eq!(ev3.signer(), node2.did()); assert_eq!(ev3.relay.path, vec![node2.did()]); assert!(matches!( - ev3.data, + ev3.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node2.did() )); // node3 notify node2 let ev2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev2.addr, node3.did()); + assert_eq!(ev2.signer(), node3.did()); assert_eq!(ev2.relay.path, vec![node3.did()]); assert!(matches!( - ev2.data, + ev2.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node3.did() )); // node2 report node3 let ev3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev3.addr, node2.did()); + assert_eq!(ev3.signer(), node2.did()); assert_eq!(ev3.relay.path, vec![node2.did()]); assert!(matches!( - ev3.data, + ev3.transaction.data()?, Message::NotifyPredecessorReport(NotifyPredecessorReport{did}) if did == node1.did() )); @@ -262,82 +263,82 @@ mod test { // node1 notify node3 let ev3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev3.addr, node1.did()); + assert_eq!(ev3.signer(), node1.did()); assert_eq!(ev3.relay.path, vec![node1.did()]); assert!(matches!( - ev3.data, + ev3.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node1.did() )); // node2 notify node1 let ev1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev1.addr, node2.did()); + assert_eq!(ev1.signer(), node2.did()); assert_eq!(ev1.relay.path, vec![node2.did()]); assert!(matches!( - ev1.data, + ev1.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node2.did() )); // node3 notify node1 let ev1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev1.addr, node3.did()); + assert_eq!(ev1.signer(), node3.did()); assert_eq!(ev1.relay.path, vec![node3.did()]); assert!(matches!( - ev1.data, + ev1.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node3.did() )); // node1 notify node2 let ev2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev2.addr, node1.did()); + assert_eq!(ev2.signer(), node1.did()); assert_eq!(ev2.relay.path, vec![node1.did()]); assert!(matches!( - ev2.data, + ev2.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node1.did() )); // node3 notify node2 let ev2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev2.addr, node3.did()); + assert_eq!(ev2.signer(), node3.did()); assert_eq!(ev2.relay.path, vec![node3.did()]); assert!(matches!( - ev2.data, + ev2.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node3.did() )); // node2 notify node3 let ev3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev3.addr, node2.did()); + assert_eq!(ev3.signer(), node2.did()); assert_eq!(ev3.relay.path, vec![node2.did()]); assert!(matches!( - ev2.data, + ev2.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node3.did() )); // node1 report node3 let ev3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev3.addr, node1.did()); + assert_eq!(ev3.signer(), node1.did()); assert_eq!(ev3.relay.path, vec![node1.did()]); assert!(matches!( - ev3.data, + ev3.transaction.data()?, Message::NotifyPredecessorReport(NotifyPredecessorReport{did}) if did == node2.did() )); // node2 report node3 let ev3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev3.addr, node2.did()); + assert_eq!(ev3.signer(), node2.did()); assert_eq!(ev3.relay.path, vec![node2.did()]); assert!(matches!( - ev3.data, + ev3.transaction.data()?, Message::NotifyPredecessorReport(NotifyPredecessorReport{did}) if did == node1.did() )); // node3 report node1 let ev1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev1.addr, node3.did()); + assert_eq!(ev1.signer(), node3.did()); assert_eq!(ev1.relay.path, vec![node3.did()]); assert!(matches!( - ev1.data, + ev1.transaction.data()?, Message::NotifyPredecessorReport(NotifyPredecessorReport{did}) if did == node2.did() )); @@ -417,46 +418,46 @@ mod test { // node2 notify node1 let ev1 = node1.listen_once().await.unwrap().0; - assert_eq!(ev1.addr, node2.did()); + assert_eq!(ev1.signer(), node2.did()); assert_eq!(ev1.relay.path, vec![node2.did()]); assert!(matches!( - ev1.data, + ev1.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node2.did() )); // node1 notify node2 let ev2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev2.addr, node1.did()); + assert_eq!(ev2.signer(), node1.did()); assert_eq!(ev2.relay.path, vec![node1.did()]); assert!(matches!( - ev2.data, + ev2.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node1.did() )); // node3 notify node2 let ev2 = node2.listen_once().await.unwrap().0; - assert_eq!(ev2.addr, node3.did()); + assert_eq!(ev2.signer(), node3.did()); assert_eq!(ev2.relay.path, vec![node3.did()]); assert!(matches!( - ev2.data, + ev2.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node3.did() )); // node2 notify node3 let ev3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev3.addr, node2.did()); + assert_eq!(ev3.signer(), node2.did()); assert_eq!(ev3.relay.path, vec![node2.did()]); assert!(matches!( - ev3.data, + ev3.transaction.data()?, Message::NotifyPredecessorSend(NotifyPredecessorSend{did}) if did == node2.did() )); // node2 report node3 let ev3 = node3.listen_once().await.unwrap().0; - assert_eq!(ev3.addr, node2.did()); + assert_eq!(ev3.signer(), node2.did()); assert_eq!(ev3.relay.path, vec![node2.did()]); assert!(matches!( - ev3.data, + ev3.transaction.data()?, Message::NotifyPredecessorReport(NotifyPredecessorReport{did}) if did == node1.did() )); diff --git a/core/src/message/handlers/storage.rs b/core/src/message/handlers/storage.rs index d9183f3bc..b12c6684e 100644 --- a/core/src/message/handlers/storage.rs +++ b/core/src/message/handlers/storage.rs @@ -105,7 +105,7 @@ pub(super) async fn handle_storage_store_act(swarm: &Swarm, act: PeerRingAction) #[cfg_attr(feature = "wasm", async_recursion(?Send))] #[cfg_attr(not(feature = "wasm"), async_recursion)] pub(super) async fn handle_storage_operate_act( - ctx: &MessagePayload, + ctx: &MessagePayload, act: &PeerRingAction, ) -> Result> { match act { @@ -178,7 +178,7 @@ impl HandleMsg for MessageHandler { /// If a VNode is storead local, it will response immediately.(See Chordstorageinterface::storage_fetch) async fn handle( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &SearchVNode, ) -> Result> { // For relay message, set redundant to 1 @@ -207,7 +207,7 @@ impl HandleMsg for MessageHandler { impl HandleMsg for MessageHandler { async fn handle( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &FoundVNode, ) -> Result> { if self.dht.did != ctx.relay.destination { @@ -225,7 +225,7 @@ impl HandleMsg for MessageHandler { impl HandleMsg for MessageHandler { async fn handle( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &VNodeOperation, ) -> Result> { // For relay message, set redundant to 1 @@ -241,7 +241,7 @@ impl HandleMsg for MessageHandler { // received remote sync vnode request async fn handle( &self, - _ctx: &MessagePayload, + _ctx: &MessagePayload, msg: &SyncVNodeWithSuccessor, ) -> Result> { let mut events = vec![]; @@ -296,7 +296,7 @@ mod test { .unwrap(); let ev = node2.listen_once().await.unwrap().0; assert!(matches!( - ev.data, + ev.transaction.data()?, Message::OperateVNode(VNodeOperation::Overwrite(x)) if x.did == vid )); @@ -315,13 +315,13 @@ mod test { let ev = node2.listen_once().await.unwrap().0; // node2 received search vnode request assert!(matches!( - ev.data, + ev.transaction.data()?, Message::SearchVNode(x) if x.vid == vid )); let ev = node1.listen_once().await.unwrap().0; assert!(matches!( - ev.data, + ev.transaction.data()?, Message::FoundVNode(x) if x.data[0].did == vid )); @@ -375,7 +375,7 @@ mod test { let ev = node2.listen_once().await.unwrap().0; assert!(matches!( - ev.data, + ev.transaction.data()?, Message::OperateVNode(VNodeOperation::Extend(VirtualNode { did, data, kind: VNodeType::Data })) if did == vid && data == vec!["111".to_string().encode()?] )); @@ -388,7 +388,7 @@ mod test { .unwrap(); let ev = node2.listen_once().await.unwrap().0; assert!(matches!( - ev.data, + ev.transaction.data()?, Message::OperateVNode(VNodeOperation::Extend(VirtualNode { did, data, kind: VNodeType::Data })) if did == vid && data == vec!["222".to_string().encode()?] )); @@ -407,13 +407,13 @@ mod test { // node2 received search vnode request assert!(matches!( - ev.data, + ev.transaction.data()?, Message::SearchVNode(x) if x.vid == vid )); let ev = node1.listen_once().await.unwrap().0; assert!(matches!( - ev.data, + ev.transaction.data()?, Message::FoundVNode(x) if x.data[0].did == vid )); @@ -437,7 +437,7 @@ mod test { let ev = node2.listen_once().await.unwrap().0; assert!(matches!( - ev.data, + ev.transaction.data()?, Message::OperateVNode(VNodeOperation::Extend(VirtualNode { did, data, kind: VNodeType::Data })) if did == vid && data == vec!["333".to_string().encode()?] )); @@ -452,13 +452,13 @@ mod test { let ev = node2.listen_once().await.unwrap().0; // node2 received search vnode request assert!(matches!( - ev.data, + ev.transaction.data()?, Message::SearchVNode(x) if x.vid == vid )); let ev = node1.listen_once().await.unwrap().0; assert!(matches!( - ev.data, + ev.transaction.data()?, Message::FoundVNode(x) if x.data[0].did == vid )); diff --git a/core/src/message/mod.rs b/core/src/message/mod.rs index c07ecf4bc..d23894dc4 100644 --- a/core/src/message/mod.rs +++ b/core/src/message/mod.rs @@ -10,8 +10,8 @@ pub use payload::encode_data_gzip; pub use payload::from_gzipped_data; pub use payload::gzip_data; pub use payload::MessagePayload; -pub use payload::OriginVerificationGen; pub use payload::PayloadSender; +pub use payload::Transaction; pub mod types; pub use types::*; @@ -29,3 +29,4 @@ pub use handlers::ValidatorFn; mod protocols; pub use protocols::MessageRelay; +pub use protocols::MessageVerificationExt; diff --git a/core/src/message/payload.rs b/core/src/message/payload.rs index 4ee15a4eb..9d219b1b3 100644 --- a/core/src/message/payload.rs +++ b/core/src/message/payload.rs @@ -16,17 +16,15 @@ use super::encoder::Encoded; use super::encoder::Encoder; use super::protocols::MessageRelay; use super::protocols::MessageVerification; -use crate::consts::DEFAULT_TTL_MS; -use crate::consts::MAX_TTL_MS; -use crate::consts::TS_OFFSET_TOLERANCE_MS; +use super::protocols::MessageVerificationExt; use crate::dht::Chord; use crate::dht::Did; use crate::dht::PeerRing; use crate::dht::PeerRingAction; +use crate::ecc::keccak256; use crate::error::Error; use crate::error::Result; use crate::session::SessionSk; -use crate::utils::get_epoch_ms; /// Compresses the given data byte slice using the gzip algorithm with the specified compression level. pub fn encode_data_gzip(data: &Bytes, level: u8) -> Result { @@ -61,13 +59,30 @@ where T: DeserializeOwned { Ok(m) } -/// An enumeration of options for generating origin verification or stick verification. -/// Verification can be Stick Verification or origin verification. -/// When MessagePayload created, Origin Verification is always generated. -/// and if OriginVerificationGen is stick, it can including existing stick ov -pub enum OriginVerificationGen { - Origin, - Stick(MessageVerification), +fn hash_transaction(destination: Did, tx_id: uuid::Uuid, data: &[u8]) -> [u8; 32] { + let mut msg = vec![]; + + msg.extend_from_slice(destination.as_bytes()); + msg.extend_from_slice(tx_id.as_bytes()); + msg.extend_from_slice(data); + + keccak256(&msg) +} + +#[derive(Derivative, Deserialize, Serialize, Clone, PartialEq, Eq)] +#[derivative(Debug)] +pub struct Transaction { + /// The destination of this message. + pub destination: Did, + /// The transaction ID. + /// Remote peer should use same tx_id when response. + pub tx_id: uuid::Uuid, + /// data + pub data: Vec, + /// This field hold a signature from a node, + /// which is used to prove that the transaction was created by that node. + #[derivative(Debug = "ignore")] + pub verification: MessageVerification, } /// All messages transmitted in RingsNetwork should be wrapped by MessagePayload. @@ -75,128 +90,82 @@ pub enum OriginVerificationGen { /// and origin verification. #[derive(Derivative, Deserialize, Serialize, Clone, PartialEq, Eq)] #[derivative(Debug)] -pub struct MessagePayload { +pub struct MessagePayload { /// Payload data - pub data: T, - /// The transaction ID of payload. - /// Remote peer should use same tx_id when response. - pub tx_id: uuid::Uuid, - /// The did of payload account, usually it's last sender. - pub addr: Did, + pub transaction: Transaction, /// Relay records the transport path of message. /// And can also help message sender to find the next hop. pub relay: MessageRelay, /// This field hold a signature from a node, - /// which is used to prove that the message was sent from that node. + /// which is used to prove that the transaction was created by that node. #[derivative(Debug = "ignore")] pub verification: MessageVerification, - /// Same as verification, but the signature was from the original sender. - #[derivative(Debug = "ignore")] - pub origin_verification: MessageVerification, } -impl MessagePayload -where T: Serialize + DeserializeOwned -{ +impl Transaction { + pub fn new( + destination: Did, + tx_id: uuid::Uuid, + data: T, + session_sk: &SessionSk, + ) -> Result + where + T: Serialize, + { + let data = bincode::serialize(&data).map_err(Error::BincodeSerialize)?; + let msg_hash = hash_transaction(destination, tx_id, &data); + let verification = MessageVerification::new(&msg_hash, session_sk)?; + Ok(Self { + destination, + tx_id, + data, + verification, + }) + } + + pub fn data(&self) -> Result + where T: DeserializeOwned { + bincode::deserialize(&self.data).map_err(Error::BincodeDeserialize) + } +} + +impl MessagePayload { /// Create new instance pub fn new( - data: T, + transaction: Transaction, session_sk: &SessionSk, - origin_verification_gen: OriginVerificationGen, relay: MessageRelay, ) -> Result { - let ts_ms = get_epoch_ms(); - let ttl_ms = DEFAULT_TTL_MS; - let msg = &MessageVerification::pack_msg(&data, ts_ms, ttl_ms)?; - let tx_id = uuid::Uuid::new_v4(); - let addr = session_sk.account_did(); - let verification = MessageVerification { - session: session_sk.session(), - sig: session_sk.sign(msg)?, - ttl_ms, - ts_ms, - }; - // If origin_verification_gen is set to Origin, simply clone it into. - let origin_verification = match origin_verification_gen { - OriginVerificationGen::Origin => verification.clone(), - OriginVerificationGen::Stick(ov) => ov, - }; - + let msg_hash = hash_transaction( + transaction.destination, + transaction.tx_id, + &transaction.data, + ); + let verification = MessageVerification::new(&msg_hash, session_sk)?; Ok(Self { - data, - tx_id, - addr, - verification, - origin_verification, + transaction, relay, + verification, }) } - /// Create new Payload for send - pub fn new_send( + pub fn new_send( data: T, session_sk: &SessionSk, next_hop: Did, destination: Did, - ) -> Result { - let relay = MessageRelay::new(vec![session_sk.account_did()], next_hop, destination); - Self::new(data, session_sk, OriginVerificationGen::Origin, relay) - } - - /// Checks whether the payload is expired. - pub fn is_expired(&self) -> bool { - if self.verification.ttl_ms > MAX_TTL_MS { - return false; - } - - if self.origin_verification.ttl_ms > MAX_TTL_MS { - return false; - } - - let now = get_epoch_ms(); - - if self.verification.ts_ms - TS_OFFSET_TOLERANCE_MS > now { - return false; - } - - if self.origin_verification.ts_ms - TS_OFFSET_TOLERANCE_MS > now { - return false; - } - - now > self.verification.ts_ms + self.verification.ttl_ms as u128 - && now > self.origin_verification.ts_ms + self.origin_verification.ttl_ms as u128 - } - - /// Verifies that the payload is not expired and that the signature is valid. - pub fn verify(&self) -> bool { - tracing::debug!("verifying payload: {:?}", self.tx_id); - - if self.is_expired() { - tracing::warn!("message expired"); - return false; - } - - if Some(self.relay.origin_sender()) != self.origin_account_did().ok() { - tracing::warn!("sender is not origin_verification generator"); - return false; - } - - self.verification.verify(&self.data) && self.origin_verification.verify(&self.data) - } - - /// Get Did from the origin verification. - pub fn origin_account_did(&self) -> Result { - Ok(self - .origin_verification - .session - .account_pubkey()? - .address() - .into()) - } - - /// Get did from sender verification. - pub fn account_did(&self) -> Result { - Ok(self.verification.session.account_pubkey()?.address().into()) + ) -> Result + where + T: Serialize, + { + let tx_id = uuid::Uuid::new_v4(); + let transaction = Transaction::new(destination, tx_id, data, session_sk)?; + let relay = MessageRelay::new( + vec![session_sk.account_did()], + next_hop, + transaction.destination, + ); + Self::new(transaction, session_sk, relay) } /// Deserializes a `MessagePayload` instance from the given binary data. @@ -210,29 +179,40 @@ where T: Serialize + DeserializeOwned .map(Bytes::from) .map_err(Error::BincodeSerialize) } +} + +impl MessageVerificationExt for Transaction { + fn verification_data(&self) -> Result> { + Ok(hash_transaction(self.destination, self.tx_id, &self.data).to_vec()) + } + + fn verification(&self) -> &MessageVerification { + &self.verification + } +} - /// Did of Sender - pub fn sender(&self) -> Result { - self.account_did() +impl MessageVerificationExt for MessagePayload { + fn verification_data(&self) -> Result> { + Ok(hash_transaction( + self.transaction.destination, + self.transaction.tx_id, + &self.transaction.data, + ) + .to_vec()) } - /// Did of Origin - pub fn origin(&self) -> Result { - self.origin_account_did() + fn verification(&self) -> &MessageVerification { + &self.verification } } -impl Encoder for MessagePayload -where T: Serialize + DeserializeOwned -{ +impl Encoder for MessagePayload { fn encode(&self) -> Result { self.to_bincode()?.encode() } } -impl Decoder for MessagePayload -where T: Serialize + DeserializeOwned -{ +impl Decoder for MessagePayload { fn from_encoded(encoded: &Encoded) -> Result { let v: Bytes = encoded.decode()?; Self::from_bincode(&v) @@ -242,15 +222,13 @@ where T: Serialize + DeserializeOwned /// Trait of PayloadSender #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] -pub trait PayloadSender -where T: Clone + Serialize + DeserializeOwned + Send + Sync + 'static -{ +pub trait PayloadSender { /// Get the session sk fn session_sk(&self) -> &SessionSk; /// Get access to DHT. fn dht(&self) -> Arc; /// Send a message payload to a specified DID. - async fn do_send_payload(&self, did: Did, payload: MessagePayload) -> Result<()>; + async fn do_send_payload(&self, did: Did, payload: MessagePayload) -> Result<()>; /// Infer the next hop for a message by calling `dht.find_successor()`. fn infer_next_hop(&self, next_hop: Option, destination: Did) -> Result { if let Some(next_hop) = next_hop { @@ -264,78 +242,71 @@ where T: Clone + Serialize + DeserializeOwned + Send + Sync + 'static } } /// Alias for `do_send_payload` that sets the next hop to `payload.relay.next_hop`. - async fn send_payload(&self, payload: MessagePayload) -> Result<()> { + async fn send_payload(&self, payload: MessagePayload) -> Result<()> { self.do_send_payload(payload.relay.next_hop, payload).await } - /// Send a message to a specified destination. - async fn send_message(&self, msg: T, destination: Did) -> Result { - let next_hop = self.infer_next_hop(None, destination)?; - let payload = MessagePayload::new_send(msg, self.session_sk(), next_hop, destination)?; - self.send_payload(payload.clone()).await?; - Ok(payload.tx_id) - } - /// Send a message to a specified destination by specified next hop. - async fn send_message_by_hop( + async fn send_message_by_hop( &self, msg: T, destination: Did, next_hop: Did, - ) -> Result { + ) -> Result + where + T: Serialize + Send, + { let payload = MessagePayload::new_send(msg, self.session_sk(), next_hop, destination)?; - self.send_payload(payload.clone()).await?; - Ok(payload.tx_id) + let tx_id = payload.transaction.tx_id; + self.send_payload(payload).await?; + Ok(tx_id) } + /// Send a message to a specified destination. + async fn send_message(&self, msg: T, destination: Did) -> Result + where T: Serialize + Send { + let next_hop = self.infer_next_hop(None, destination)?; + self.send_message_by_hop(msg, destination, next_hop).await + } /// Send a direct message to a specified destination. - async fn send_direct_message(&self, msg: T, destination: Did) -> Result { - let payload = MessagePayload::new_send(msg, self.session_sk(), destination, destination)?; - self.send_payload(payload.clone()).await?; - Ok(payload.tx_id) + async fn send_direct_message(&self, msg: T, destination: Did) -> Result + where T: Serialize + Send { + self.send_message_by_hop(msg, destination, destination) + .await } /// Send a report message to a specified destination. - async fn send_report_message(&self, payload: &MessagePayload, msg: T) -> Result<()> { + async fn send_report_message(&self, payload: &MessagePayload, msg: T) -> Result<()> + where T: Serialize + Send { let relay = payload.relay.report(self.dht().did)?; - let mut pl = - MessagePayload::new(msg, self.session_sk(), OriginVerificationGen::Origin, relay)?; - pl.tx_id = payload.tx_id; + let transaction = Transaction::new( + relay.destination, + payload.transaction.tx_id, + msg, + self.session_sk(), + )?; + let pl = MessagePayload::new(transaction, self.session_sk(), relay)?; self.send_payload(pl).await } /// Forward a payload message by relay. /// It just create a new payload, cloned data, resigned with session and send - async fn forward_by_relay( - &self, - payload: &MessagePayload, - relay: MessageRelay, - ) -> Result<()> { - let mut new_pl = MessagePayload::new( - payload.data.clone(), - self.session_sk(), - OriginVerificationGen::Stick(payload.origin_verification.clone()), - relay, - )?; - new_pl.tx_id = payload.tx_id; + async fn forward_by_relay(&self, payload: &MessagePayload, relay: MessageRelay) -> Result<()> { + let new_pl = MessagePayload::new(payload.transaction.clone(), self.session_sk(), relay)?; self.send_payload(new_pl).await } /// Forward a payload message, with the next hop inferred by the DHT. - async fn forward_payload( - &self, - payload: &MessagePayload, - next_hop: Option, - ) -> Result<()> { + async fn forward_payload(&self, payload: &MessagePayload, next_hop: Option) -> Result<()> { let next_hop = self.infer_next_hop(next_hop, payload.relay.destination)?; let relay = payload.relay.forward(self.dht().did, next_hop)?; self.forward_by_relay(payload, relay).await } /// Reset the destination to a secp DID. - async fn reset_destination(&self, payload: &MessagePayload, next_hop: Did) -> Result<()> { + async fn reset_destination(&self, payload: &MessagePayload, next_hop: Did) -> Result<()> { let relay = payload .relay .reset_destination(next_hop) @@ -360,7 +331,7 @@ pub mod test { d: bool, } - pub fn new_test_payload(next_hop: Did) -> MessagePayload { + pub fn new_test_payload(next_hop: Did) -> MessagePayload { let test_data = TestData { a: "hello".to_string(), b: 111, @@ -370,12 +341,12 @@ pub mod test { new_payload(test_data, next_hop) } - pub fn new_payload(data: T, next_hop: Did) -> MessagePayload + pub fn new_payload(data: T, next_hop: Did) -> MessagePayload where T: Serialize + DeserializeOwned { let key = SecretKey::random(); let destination = SecretKey::random().address().into(); - let session = SessionSk::new_with_seckey(&key).unwrap(); - MessagePayload::new_send(data, &session, next_hop, destination).unwrap() + let session_sk = SessionSk::new_with_seckey(&key).unwrap(); + MessagePayload::new_send(data, &session_sk, next_hop, destination).unwrap() } #[test] @@ -393,11 +364,11 @@ pub mod test { let payload = new_test_payload(next_hop); let gzipped_encoded_payload = payload.encode().unwrap(); - let payload2: MessagePayload = gzipped_encoded_payload.decode().unwrap(); + let payload2: MessagePayload = gzipped_encoded_payload.decode().unwrap(); assert_eq!(payload, payload2); let gunzip_encoded_payload = payload.to_bincode().unwrap().encode().unwrap(); - let payload2: MessagePayload = gunzip_encoded_payload.decode().unwrap(); + let payload2: MessagePayload = gunzip_encoded_payload.decode().unwrap(); assert_eq!(payload, payload2); } diff --git a/core/src/message/protocols/mod.rs b/core/src/message/protocols/mod.rs index ae036c9df..932b155b9 100644 --- a/core/src/message/protocols/mod.rs +++ b/core/src/message/protocols/mod.rs @@ -3,3 +3,4 @@ mod verify; pub use self::relay::MessageRelay; pub use self::verify::MessageVerification; +pub use self::verify::MessageVerificationExt; diff --git a/core/src/message/protocols/verify.rs b/core/src/message/protocols/verify.rs index fcadf86be..daf5964eb 100644 --- a/core/src/message/protocols/verify.rs +++ b/core/src/message/protocols/verify.rs @@ -1,35 +1,56 @@ -//! Implementation of Message Verification. #![warn(missing_docs)] -use std::fmt::Write; +//! Implementation of Message Verification. use serde::Deserialize; use serde::Serialize; -use crate::ecc::signers; -use crate::ecc::PublicKey; -use crate::error::Error; +use crate::consts::DEFAULT_TTL_MS; +use crate::consts::MAX_TTL_MS; +use crate::consts::TS_OFFSET_TOLERANCE_MS; +use crate::dht::Did; use crate::error::Result; use crate::session::Session; +use crate::session::SessionSk; +use crate::utils::get_epoch_ms; /// Message Verification is based on session, and sig. /// it also included ttl time and created ts. #[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] pub struct MessageVerification { pub session: Session, - pub ttl_ms: usize, + pub ttl_ms: u64, pub ts_ms: u128, pub sig: Vec, } +fn pack_msg(data: &[u8], ts_ms: u128, ttl_ms: u64) -> Vec { + let mut msg = vec![]; + + msg.extend_from_slice(&ts_ms.to_be_bytes()); + msg.extend_from_slice(&ttl_ms.to_be_bytes()); + msg.extend_from_slice(data); + + msg +} + impl MessageVerification { - /// Verify a MessageVerification - pub fn verify(&self, data: &T) -> bool - where T: Serialize { - let Ok(msg) = self.msg(data) else { - tracing::warn!("MessageVerification pack_msg failed"); - return false; + pub fn new(data: &[u8], session_sk: &SessionSk) -> Result { + let ts_ms = get_epoch_ms(); + let ttl_ms = DEFAULT_TTL_MS; + let msg = pack_msg(data, ts_ms, ttl_ms); + let verification = MessageVerification { + session: session_sk.session(), + sig: session_sk.sign(&msg)?, + ttl_ms, + ts_ms, }; + Ok(verification) + } + + /// Verify a MessageVerification + pub fn verify(&self, data: &[u8]) -> bool { + let msg = pack_msg(data, self.ts_ms, self.ttl_ms); self.session .verify(&msg, &self.sig) @@ -38,25 +59,49 @@ impl MessageVerification { }) .is_ok() } +} - /// Recover publickey from packed message. - pub fn session_pubkey(&self, data: &T) -> Result - where T: Serialize { - let msg = self.msg(data)?; - signers::secp256k1::recover(&msg, &self.sig) +/// This trait helps a struct with `MessageVerification` field to `verify` itself. +/// It also provides a `signer` method to let receiver know who sent the message. +pub trait MessageVerificationExt { + /// Give the data to be verified. + fn verification_data(&self) -> Result>; + + /// Give the verification field for verifying. + fn verification(&self) -> &MessageVerification; + + /// Checks whether the message is expired. + fn is_expired(&self) -> bool { + if self.verification().ttl_ms > MAX_TTL_MS { + return false; + } + + let now = get_epoch_ms(); + + if self.verification().ts_ms - TS_OFFSET_TOLERANCE_MS > now { + return false; + } + + now > self.verification().ts_ms + self.verification().ttl_ms as u128 } - /// Pack Message to string, and attach ts and ttl on it. - pub fn pack_msg(data: &T, ts_ms: u128, ttl_ms: usize) -> Result - where T: Serialize { - let mut msg = serde_json::to_string(data).map_err(|_| Error::SerializeToString)?; - write!(msg, "\n{}\n{}", ts_ms, ttl_ms).map_err(|_| Error::SerializeToString)?; - Ok(msg) + /// Verifies that the message is not expired and that the signature is valid. + fn verify(&self) -> bool { + if self.is_expired() { + tracing::warn!("message expired"); + return false; + } + + let Ok(data) = self.verification_data() else { + tracing::warn!("MessageVerificationExt verify get verification_data failed"); + return false; + }; + + self.verification().verify(&data) } - /// Alias of pack_msg. - fn msg(&self, data: &T) -> Result - where T: Serialize { - Self::pack_msg(data, self.ts_ms, self.ttl_ms) + /// Get signer did from verification. + fn signer(&self) -> Did { + self.verification().session.account_did() } } diff --git a/core/src/session.rs b/core/src/session.rs index e94ea94c4..415a2183e 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -47,6 +47,7 @@ use serde::Serialize; use crate::consts::DEFAULT_SESSION_TTL_MS; use crate::dht::Did; +use crate::ecc::keccak256; use crate::ecc::signers; use crate::ecc::PublicKey; use crate::ecc::SecretKey; @@ -54,7 +55,7 @@ use crate::error::Error; use crate::error::Result; use crate::utils; -fn pack_session(session_id: Did, ts_ms: u128, ttl_ms: usize) -> String { +fn pack_session(session_id: Did, ts_ms: u128, ttl_ms: u64) -> String { format!("{}\n{}\n{}", session_id, ts_ms, ttl_ms) } @@ -72,7 +73,7 @@ pub struct SessionSkBuilder { /// Account of session. account_type: String, /// Session's lifetime - ttl_ms: usize, + ttl_ms: u64, /// Timestamp when session created ts_ms: u128, /// Signature of session @@ -109,7 +110,7 @@ pub struct Session { /// Account of session account: Account, /// Session's lifetime - ttl_ms: usize, + ttl_ms: u64, /// Timestamp when session created ts_ms: u128, /// Signature to verify that the session was signed by the account. @@ -196,7 +197,7 @@ impl SessionSkBuilder { } /// Set the lifetime of session. - pub fn set_ttl(mut self, ttl_ms: usize) -> Self { + pub fn set_ttl(mut self, ttl_ms: u64) -> Self { self.ttl_ms = ttl_ms; self } @@ -223,8 +224,10 @@ impl SessionSkBuilder { impl Session { /// Pack the session into a string for verification or public key recovery. - pub fn pack(&self) -> String { + pub fn pack(&self) -> Vec { pack_session(self.session_id, self.ts_ms, self.ttl_ms) + .as_bytes() + .to_vec() } /// Check session is expired or not. @@ -239,16 +242,16 @@ impl Session { return Err(Error::SessionExpired); } - let auth_str = self.pack(); + let auth_bytes = self.pack(); if !(match self.account { Account::Secp256k1(did) => { - signers::secp256k1::verify(&auth_str, &did.into(), &self.sig) + signers::secp256k1::verify(&auth_bytes, &did.into(), &self.sig) } - Account::EIP191(did) => signers::eip191::verify(&auth_str, &did.into(), &self.sig), - Account::BIP137(did) => signers::bip137::verify(&auth_str, &did.into(), &self.sig), + Account::EIP191(did) => signers::eip191::verify(&auth_bytes, &did.into(), &self.sig), + Account::BIP137(did) => signers::bip137::verify(&auth_bytes, &did.into(), &self.sig), Account::Ed25519(pk) => { - signers::ed25519::verify(&auth_str, &pk.address(), &self.sig, pk) + signers::ed25519::verify(&auth_bytes, &pk.address(), &self.sig, pk) } }) { return Err(Error::VerifySignatureFailed); @@ -258,7 +261,7 @@ impl Session { } /// Verify message. - pub fn verify(&self, msg: &str, sig: impl AsRef<[u8]>) -> Result<()> { + pub fn verify(&self, msg: &[u8], sig: impl AsRef<[u8]>) -> Result<()> { self.verify_self()?; if !signers::secp256k1::verify(msg, &self.session_id, sig) { return Err(Error::VerifySignatureFailed); @@ -268,11 +271,11 @@ impl Session { /// Get public key from session for encryption. pub fn account_pubkey(&self) -> Result { - let auth_str = self.pack(); + let auth_bytes = self.pack(); match self.account { - Account::Secp256k1(_) => signers::secp256k1::recover(&auth_str, &self.sig), - Account::BIP137(_) => signers::bip137::recover(&auth_str, &self.sig), - Account::EIP191(_) => signers::eip191::recover(&auth_str, &self.sig), + Account::Secp256k1(_) => signers::secp256k1::recover(&auth_bytes, &self.sig), + Account::BIP137(_) => signers::bip137::recover(&auth_bytes, &self.sig), + Account::EIP191(_) => signers::eip191::recover(&auth_bytes, &self.sig), Account::Ed25519(pk) => Ok(pk), } } @@ -309,9 +312,10 @@ impl SessionSk { } /// Sign message with session. - pub fn sign(&self, msg: &str) -> Result> { + pub fn sign(&self, msg: &[u8]) -> Result> { let key = self.sk; - Ok(signers::secp256k1::sign_raw(key, msg).to_vec()) + let h = keccak256(msg); + Ok(signers::secp256k1::sign(key, &h).to_vec()) } /// Get account did from session. diff --git a/core/src/swarm/impls.rs b/core/src/swarm/impls.rs index 80f65724f..079f81a6e 100644 --- a/core/src/swarm/impls.rs +++ b/core/src/swarm/impls.rs @@ -11,6 +11,7 @@ use crate::message::ConnectNodeReport; use crate::message::ConnectNodeSend; use crate::message::Message; use crate::message::MessagePayload; +use crate::message::MessageVerificationExt; use crate::message::PayloadSender; use crate::swarm::Swarm; use crate::types::Connection; @@ -38,21 +39,18 @@ pub trait ConnectionHandshake { /// Creaet new connection and its answer. This function will wrap the offer inside a payload /// with verification. - async fn create_offer(&self, peer: Did) -> Result<(Connection, MessagePayload)>; + async fn create_offer(&self, peer: Did) -> Result<(Connection, MessagePayload)>; /// Answer the offer of remote connection. This function will verify the answer payload and /// will wrap the answer inside a payload with verification. async fn answer_offer( &self, - offer_payload: MessagePayload, - ) -> Result<(Connection, MessagePayload)>; + offer_payload: MessagePayload, + ) -> Result<(Connection, MessagePayload)>; /// Accept the answer of remote connection. This function will verify the answer payload and /// will return its did with the connection. - async fn accept_answer( - &self, - answer_payload: MessagePayload, - ) -> Result<(Did, Connection)>; + async fn accept_answer(&self, answer_payload: MessagePayload) -> Result<(Did, Connection)>; } /// A trait for managing connections. @@ -248,7 +246,7 @@ impl ConnectionHandshake for Swarm { Ok(conn) } - async fn create_offer(&self, peer: Did) -> Result<(Connection, MessagePayload)> { + async fn create_offer(&self, peer: Did) -> Result<(Connection, MessagePayload)> { let (conn, offer_msg) = self.prepare_connection_offer(peer).await?; // This payload has fake next_hop. @@ -265,13 +263,13 @@ impl ConnectionHandshake for Swarm { async fn answer_offer( &self, - offer_payload: MessagePayload, - ) -> Result<(Connection, MessagePayload)> { + offer_payload: MessagePayload, + ) -> Result<(Connection, MessagePayload)> { if !offer_payload.verify() { return Err(Error::VerifySignatureFailed); } - let Message::ConnectNodeSend(msg) = offer_payload.data else { + let Message::ConnectNodeSend(msg) = offer_payload.transaction.data()? else { return Err(Error::InvalidMessage( "Should be ConnectNodeSend".to_string(), )); @@ -292,17 +290,14 @@ impl ConnectionHandshake for Swarm { Ok((conn, answer_payload)) } - async fn accept_answer( - &self, - answer_payload: MessagePayload, - ) -> Result<(Did, Connection)> { + async fn accept_answer(&self, answer_payload: MessagePayload) -> Result<(Did, Connection)> { tracing::debug!("accept_answer: {:?}", answer_payload); if !answer_payload.verify() { return Err(Error::VerifySignatureFailed); } - let Message::ConnectNodeReport(ref msg) = answer_payload.data else { + let Message::ConnectNodeReport(ref msg) = answer_payload.transaction.data()? else { return Err(Error::InvalidMessage( "Should be ConnectNodeReport".to_string(), )); diff --git a/core/src/swarm/mod.rs b/core/src/swarm/mod.rs index 3460eebc2..d2b7c41aa 100644 --- a/core/src/swarm/mod.rs +++ b/core/src/swarm/mod.rs @@ -6,7 +6,6 @@ pub(crate) mod callback; pub mod impls; mod types; -use std::fmt; use std::sync::Arc; use async_recursion::async_recursion; @@ -18,8 +17,6 @@ use rings_transport::core::transport::BoxedTransport; use rings_transport::core::transport::ConnectionInterface; use rings_transport::core::transport::TransportMessage; use rings_transport::error::Error as TransportError; -use serde::de::DeserializeOwned; -use serde::Serialize; pub use types::MeasureImpl; pub use types::WrappedDid; @@ -38,6 +35,7 @@ use crate::message::Message; use crate::message::MessageHandler; use crate::message::MessageHandlerEvent; use crate::message::MessagePayload; +use crate::message::MessageVerificationExt; use crate::message::PayloadSender; use crate::session::SessionSk; use crate::swarm::impls::ConnectionHandshake; @@ -80,7 +78,7 @@ impl Swarm { } /// Load message from a TransportEvent. - async fn load_message(&self, ev: TransportEvent) -> Result>> { + async fn load_message(&self, ev: TransportEvent) -> Result> { match ev { TransportEvent::DataChannelMessage(msg) => { let payload = MessagePayload::from_bincode(&msg)?; @@ -113,7 +111,7 @@ impl Swarm { /// This method is required because web-sys components is not `Send` /// which means an async loop cannot running concurrency. - pub async fn poll_message(&self) -> Option> { + pub async fn poll_message(&self) -> Option { let receiver = &self.transport_event_channel.receiver(); match Channel::recv(receiver).await { Ok(Some(ev)) => match self.load_message(ev).await { @@ -132,10 +130,10 @@ impl Swarm { /// This method is required because web-sys components is not `Send` /// This method will return events already consumed (landed), which is ok to be ignore. /// which means a listening loop cannot running concurrency. - pub async fn listen_once(&self) -> Option<(MessagePayload, Vec)> { + pub async fn listen_once(&self) -> Option<(MessagePayload, Vec)> { let payload = self.poll_message().await?; - if !payload.verify() { + if !(payload.verify() && payload.transaction.verify()) { tracing::error!("Cannot verify msg or it's expired: {:?}", payload); return None; } @@ -309,9 +307,7 @@ impl Swarm { #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] -impl PayloadSender for Swarm -where T: Clone + Serialize + DeserializeOwned + Send + Sync + 'static + fmt::Debug -{ +impl PayloadSender for Swarm { fn session_sk(&self) -> &SessionSk { Swarm::session_sk(self) } @@ -320,7 +316,7 @@ where T: Clone + Serialize + DeserializeOwned + Send + Sync + 'static + fmt::Deb Swarm::dht(self) } - async fn do_send_payload(&self, did: Did, payload: MessagePayload) -> Result<()> { + async fn do_send_payload(&self, did: Did, payload: MessagePayload) -> Result<()> { #[cfg(test)] { println!("+++++++++++++++++++++++++++++++++"); diff --git a/node/src/backend/extension.rs b/node/src/backend/extension.rs index 1d611dd3f..6647e5ff4 100644 --- a/node/src/backend/extension.rs +++ b/node/src/backend/extension.rs @@ -135,7 +135,7 @@ impl MessageEndpoint for Extension { /// Handles the incoming message by passing it to the extension handlers and returning the resulting events. async fn handle_message( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, data: &BackendMessage, ) -> Result> { let mut ret = vec![]; diff --git a/node/src/backend/service/http_server.rs b/node/src/backend/service/http_server.rs index 4d3aa8eb5..85cbe4497 100644 --- a/node/src/backend/service/http_server.rs +++ b/node/src/backend/service/http_server.rs @@ -127,7 +127,7 @@ impl HttpServer { impl MessageEndpoint for HttpServer { async fn handle_message( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &BackendMessage, ) -> Result> { let req: HttpRequest = bincode::deserialize(&msg.data).map_err(|_| Error::DecodeError)?; diff --git a/node/src/backend/service/mod.rs b/node/src/backend/service/mod.rs index 6e960dc6e..fa59b3dff 100644 --- a/node/src/backend/service/mod.rs +++ b/node/src/backend/service/mod.rs @@ -30,7 +30,6 @@ use crate::error::Result; use crate::prelude::rings_core::chunk::Chunk; use crate::prelude::rings_core::chunk::ChunkList; use crate::prelude::rings_core::chunk::ChunkManager; -use crate::prelude::rings_core::message::Message; use crate::prelude::*; /// A Backend struct contains http_server. @@ -100,7 +99,7 @@ impl MessageCallback for Backend { /// And send http request to localhost through Backend http_request handler. async fn custom_message( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, msg: &CustomMessage, ) -> Vec { let msg = msg.0.clone(); @@ -159,7 +158,7 @@ impl MessageCallback for Backend { } } - async fn builtin_message(&self, _ctx: &MessagePayload) -> Vec { + async fn builtin_message(&self, _ctx: &MessagePayload) -> Vec { vec![] } } diff --git a/node/src/backend/service/text.rs b/node/src/backend/service/text.rs index 49163f5d6..70b9c8778 100644 --- a/node/src/backend/service/text.rs +++ b/node/src/backend/service/text.rs @@ -18,7 +18,7 @@ pub struct TextEndpoint; impl MessageEndpoint for TextEndpoint { async fn handle_message( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, data: &BackendMessage, ) -> Result> { let text = str::from_utf8(&data.data).map_err(|_| Error::InvalidMessage)?; diff --git a/node/src/backend/service/utils.rs b/node/src/backend/service/utils.rs index ddcfff938..96cd341ad 100644 --- a/node/src/backend/service/utils.rs +++ b/node/src/backend/service/utils.rs @@ -6,7 +6,7 @@ use crate::prelude::*; /// send chunk report message pub async fn send_chunk_report_message( - ctx: &MessagePayload, + ctx: &MessagePayload, data: &[u8], ) -> Result { let mut new_bytes: Vec = Vec::with_capacity(data.len() + 1); diff --git a/node/src/backend/types.rs b/node/src/backend/types.rs index 0ec1bbee1..7930843a4 100644 --- a/node/src/backend/types.rs +++ b/node/src/backend/types.rs @@ -157,7 +157,7 @@ pub trait MessageEndpoint { /// handle_message async fn handle_message( &self, - ctx: &MessagePayload, + ctx: &MessagePayload, data: &BackendMessage, ) -> Result>; } diff --git a/node/src/browser/client.rs b/node/src/browser/client.rs index 317484927..3c2481209 100644 --- a/node/src/browser/client.rs +++ b/node/src/browser/client.rs @@ -13,7 +13,6 @@ use rings_core::async_trait; use rings_core::dht::Did; use rings_core::ecc::PublicKey; use rings_core::message::CustomMessage; -use rings_core::message::Message; use rings_core::message::MessageCallback; use rings_core::message::MessageHandlerEvent; use rings_core::message::MessagePayload; @@ -673,7 +672,7 @@ impl MessageCallbackInstance { impl MessageCallbackInstance { pub async fn handle_message_data( &self, - relay: &MessagePayload, + relay: &MessagePayload, data: &Bytes, ) -> anyhow::Result<()> { let m = BackendMessage::try_from(data.to_vec()).map_err(|e| anyhow::anyhow!("{}", e))?; @@ -694,7 +693,7 @@ impl MessageCallbackInstance { pub async fn handle_simple_text_message( &self, - relay: &MessagePayload, + relay: &MessagePayload, data: &[u8], ) -> anyhow::Result<()> { log::debug!("custom_message received: {:?}", data); @@ -717,20 +716,20 @@ impl MessageCallbackInstance { pub async fn handle_http_response( &self, - relay: &MessagePayload, + relay: &MessagePayload, data: &[u8], ) -> anyhow::Result<()> { let msg_content = data; log::info!( "message of {:?} received, before gunzip: {:?}", - relay.tx_id, + relay.transaction.tx_id, msg_content.len(), ); let this = JsValue::null(); let msg_content = message::decode_gzip_data(&Bytes::from(data.to_vec())).unwrap(); log::info!( "message of {:?} received, after gunzip: {:?}", - relay.tx_id, + relay.transaction.tx_id, msg_content.len(), ); let http_response: HttpResponse = bincode::deserialize(&msg_content)?; @@ -783,7 +782,7 @@ impl MessageCallbackInstance { impl MessageCallback for MessageCallbackInstance { async fn custom_message( &self, - relay: &MessagePayload, + relay: &MessagePayload, msg: &CustomMessage, ) -> Vec { if msg.0.len() < 2 { @@ -800,11 +799,14 @@ impl MessageCallback for MessageCallbackInstance { return vec![]; } let data = data.unwrap(); - log::debug!("chunk message of {:?} received", relay.tx_id); + log::debug!("chunk message of {:?} received", relay.transaction.tx_id); if let Some(data) = data { data } else { - log::info!("chunk message of {:?} not complete", relay.tx_id); + log::info!( + "chunk message of {:?} not complete", + relay.transaction.tx_id + ); return vec![]; } } else if tag == 0 { @@ -819,7 +821,7 @@ impl MessageCallback for MessageCallbackInstance { vec![] } - async fn builtin_message(&self, relay: &MessagePayload) -> Vec { + async fn builtin_message(&self, relay: &MessagePayload) -> Vec { let this = JsValue::null(); // log::debug!("builtin_message received: {:?}", relay); if let Ok(r) = self diff --git a/node/src/jsonrpc/server.rs b/node/src/jsonrpc/server.rs index 970ed2867..8d901ba0d 100644 --- a/node/src/jsonrpc/server.rs +++ b/node/src/jsonrpc/server.rs @@ -28,7 +28,6 @@ use crate::prelude::rings_core::dht::Did; use crate::prelude::rings_core::message::Decoder; use crate::prelude::rings_core::message::Encoded; use crate::prelude::rings_core::message::Encoder; -use crate::prelude::rings_core::message::Message; use crate::prelude::rings_core::message::MessagePayload; use crate::prelude::rings_core::prelude::vnode::VirtualNode; use crate::prelude::rings_rpc; @@ -205,7 +204,7 @@ pub(crate) async fn answer_offer(params: Params, meta: RpcMeta) -> Result .ok_or_else(|| Error::new(ErrorCode::InvalidParams))?; let encoded: Encoded = >::from(offer_payload_str); let offer_payload = - MessagePayload::::from_encoded(&encoded).map_err(|_| ServerError::DecodeError)?; + MessagePayload::from_encoded(&encoded).map_err(|_| ServerError::DecodeError)?; let (_, answer_payload) = meta .processor @@ -234,7 +233,7 @@ pub(crate) async fn accept_answer(params: Params, meta: RpcMeta) -> Result>::from(answer_payload_str); let answer_payload = - MessagePayload::::from_encoded(&encoded).map_err(|_| ServerError::DecodeError)?; + MessagePayload::from_encoded(&encoded).map_err(|_| ServerError::DecodeError)?; let p: processor::Peer = meta .processor .swarm diff --git a/node/src/native/endpoint/mod.rs b/node/src/native/endpoint/mod.rs index b56f866e0..d1709ddef 100644 --- a/node/src/native/endpoint/mod.rs +++ b/node/src/native/endpoint/mod.rs @@ -111,7 +111,7 @@ async fn jsonrpc_io_handler( .swarm .session_sk() .session() - .verify(&body, sig) + .verify(body.as_bytes(), sig) .map_err(|e| { tracing::debug!("body: {:?}", body); tracing::debug!("signature: {:?}", signature); diff --git a/node/src/processor.rs b/node/src/processor.rs index 66893e645..07961db41 100644 --- a/node/src/processor.rs +++ b/node/src/processor.rs @@ -357,8 +357,8 @@ impl Processor { let encoded_answer: Encoded = >::from(&answer_payload_str); - let answer_payload = MessagePayload::::from_encoded(&encoded_answer) - .map_err(|_| Error::DecodeError)?; + let answer_payload = + MessagePayload::from_encoded(&encoded_answer).map_err(|_| Error::DecodeError)?; let (did, conn) = self .swarm @@ -660,7 +660,7 @@ mod test { impl MessageCallback for MsgCallbackStruct { async fn custom_message( &self, - _ctx: &MessagePayload, + _ctx: &MessagePayload, msg: &CustomMessage, ) -> Vec { let text = unpack_text_message(msg).unwrap(); @@ -669,10 +669,7 @@ mod test { vec![] } - async fn builtin_message( - &self, - _ctx: &MessagePayload, - ) -> Vec { + async fn builtin_message(&self, _ctx: &MessagePayload) -> Vec { vec![] } } diff --git a/node/src/tests/wasm/processor.rs b/node/src/tests/wasm/processor.rs index 2f9453853..0a8d4dee3 100644 --- a/node/src/tests/wasm/processor.rs +++ b/node/src/tests/wasm/processor.rs @@ -42,7 +42,7 @@ struct MsgCallbackStruct { impl MessageCallback for MsgCallbackStruct { async fn custom_message( &self, - _ctx: &MessagePayload, + _ctx: &MessagePayload, msg: &CustomMessage, ) -> Vec { let text = processor::unpack_text_message(msg).unwrap(); @@ -52,7 +52,7 @@ impl MessageCallback for MsgCallbackStruct { vec![] } - async fn builtin_message(&self, _ctx: &MessagePayload) -> Vec { + async fn builtin_message(&self, _ctx: &MessagePayload) -> Vec { vec![] } } diff --git a/rpc/src/jsonrpc_client/client.rs b/rpc/src/jsonrpc_client/client.rs index 681143707..9cb7556f6 100644 --- a/rpc/src/jsonrpc_client/client.rs +++ b/rpc/src/jsonrpc_client/client.rs @@ -80,7 +80,7 @@ impl SimpleClient { if let Some(delegated_sk) = &self.delegated_sk { let sig = delegated_sk - .sign(&request.clone()) + .sign(request.clone().as_bytes()) .map_err(|e| RpcError::Client(format!("Failed to sign request: {}", e)))?; let encoded_sig = base64::encode(sig); req = req.header("X-SIGNATURE", encoded_sig); From acfc83e0083eacdc1ffc9a31e067e227e56d7609 Mon Sep 17 00:00:00 2001 From: magine Date: Wed, 18 Oct 2023 12:17:15 +0800 Subject: [PATCH 2/2] Update docs --- core/src/message/payload.rs | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/core/src/message/payload.rs b/core/src/message/payload.rs index 9d219b1b3..9a276c31a 100644 --- a/core/src/message/payload.rs +++ b/core/src/message/payload.rs @@ -1,3 +1,5 @@ +#![warn(missing_docs)] + use std::io::Write; use std::sync::Arc; @@ -69,6 +71,11 @@ fn hash_transaction(destination: Did, tx_id: uuid::Uuid, data: &[u8]) -> [u8; 32 keccak256(&msg) } +/// All messages transmitted in RingsNetwork should be wrapped by `Transaction`. +/// It additionally offer destination, tx_id and verification. +/// +/// To transmit `Transaction` in RingsNetwork, user should build +/// [MessagePayload] and use [PayloadSender] to send. #[derive(Derivative, Deserialize, Serialize, Clone, PartialEq, Eq)] #[derivative(Debug)] pub struct Transaction { @@ -79,15 +86,14 @@ pub struct Transaction { pub tx_id: uuid::Uuid, /// data pub data: Vec, - /// This field hold a signature from a node, + /// This field holds a signature from a node, /// which is used to prove that the transaction was created by that node. #[derivative(Debug = "ignore")] pub verification: MessageVerification, } -/// All messages transmitted in RingsNetwork should be wrapped by MessagePayload. -/// It additionally offer transaction ID, origin did, relay, previous hop verification, -/// and origin verification. +/// `MessagePayload` is used to transmit data between nodes. +/// The data should be packed by [Transaction]. #[derive(Derivative, Deserialize, Serialize, Clone, PartialEq, Eq)] #[derivative(Debug)] pub struct MessagePayload { @@ -96,13 +102,15 @@ pub struct MessagePayload { /// Relay records the transport path of message. /// And can also help message sender to find the next hop. pub relay: MessageRelay, - /// This field hold a signature from a node, - /// which is used to prove that the transaction was created by that node. + /// This field holds a signature from a node, + /// which is used to prove that payload was created by that node. #[derivative(Debug = "ignore")] pub verification: MessageVerification, } impl Transaction { + /// Wrap data. Will serialize by [bincode::serialize] + /// then sign [MessageVerification] by session_sk. pub fn new( destination: Did, tx_id: uuid::Uuid, @@ -123,6 +131,7 @@ impl Transaction { }) } + /// Deserializes the data field into a `T` instance. pub fn data(&self) -> Result where T: DeserializeOwned { bincode::deserialize(&self.data).map_err(Error::BincodeDeserialize) @@ -130,7 +139,8 @@ impl Transaction { } impl MessagePayload { - /// Create new instance + /// Create new `MessagePayload`. + /// Need [Transaction], [SessionSk] and [MessageRelay]. pub fn new( transaction: Transaction, session_sk: &SessionSk, @@ -149,6 +159,7 @@ impl MessagePayload { }) } + /// Helps to create sending message from data. pub fn new_send( data: T, session_sk: &SessionSk,