From e6375c288ff689ff9ee76dfff8b32e0b5d945f22 Mon Sep 17 00:00:00 2001 From: Stanislav Eliseev Date: Fri, 21 Feb 2025 16:06:45 +0100 Subject: [PATCH] refactor(core): rework ext message limits check --- Cargo.lock | 3 +- Cargo.toml | 1 + block-util/Cargo.toml | 10 + block-util/benches/big_message.rs | 25 ++ block-util/benches/common.rs | 36 +++ block-util/benches/decode_message.rs | 52 ++++ block-util/src/block/block_proof_stuff.rs | 21 +- block-util/src/block/mod.rs | 4 +- block-util/src/lib.rs | 1 + .../src/message}/ext_msg_repr.rs | 228 ++++++++---------- block-util/src/message/mod.rs | 5 + collator/src/mempool/impls/std_impl/parser.rs | 2 +- core/src/blockchain_rpc/service.rs | 6 + core/tests/overlay_server.rs | 13 +- rpc/Cargo.toml | 2 +- rpc/src/endpoint/jrpc/mod.rs | 16 +- rpc/src/endpoint/proto/mod.rs | 7 +- util/Cargo.toml | 3 - util/src/lib.rs | 7 - util/src/serde_helpers.rs | 92 +++++++ 20 files changed, 365 insertions(+), 169 deletions(-) create mode 100644 block-util/benches/big_message.rs create mode 100644 block-util/benches/common.rs create mode 100644 block-util/benches/decode_message.rs rename {util/src/bc => block-util/src/message}/ext_msg_repr.rs (72%) create mode 100644 block-util/src/message/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 73a0897d3..d0086972e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3470,7 +3470,9 @@ version = "0.2.3" dependencies = [ "anyhow", "arc-swap", + "base64", "bytes", + "criterion", "everscale-types", "hex", "parking_lot", @@ -3829,7 +3831,6 @@ dependencies = [ "castaway", "criterion", "dashmap", - "everscale-types", "futures-util", "getip", "humantime", diff --git a/Cargo.toml b/Cargo.toml index f66a95c5f..2804d0eeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ bytesize = { version = "1.3.0", features = ["serde"] } castaway = "0.2" clap = { version = "4.5.3", features = ["derive"] } crc32c = "0.6" +criterion = "0.5.1" dashmap = "5.5.3" dirs = "5.0.1" ed25519 = "2.0" diff --git a/block-util/Cargo.toml b/block-util/Cargo.toml index 81ca27992..8b4487d89 100644 --- a/block-util/Cargo.toml +++ b/block-util/Cargo.toml @@ -9,6 +9,14 @@ rust-version.workspace = true repository.workspace = true license.workspace = true +[[bench]] +name = "big_message" +harness = false + +[[bench]] +name = "decode_message" +harness = false + [dependencies] # crates.io deps anyhow = { workspace = true } @@ -24,6 +32,8 @@ tl-proto = { workspace = true } tycho-util = { workspace = true } [dev-dependencies] +base64 = { workspace = true } +criterion = { workspace = true } [features] test = [] diff --git a/block-util/benches/big_message.rs b/block-util/benches/big_message.rs new file mode 100644 index 000000000..2ec9b06df --- /dev/null +++ b/block-util/benches/big_message.rs @@ -0,0 +1,25 @@ +use bytes::Bytes; +use criterion::{criterion_group, criterion_main, Criterion}; +use everscale_types::prelude::Boc; +use tycho_block_util::message::ExtMsgRepr; + +use self::common::create_big_message; + +mod common; + +fn big_message_benchmark(c: &mut Criterion) { + let boc = { + let cell = create_big_message().unwrap(); + Boc::encode(&cell) + }; + + let bytes = Bytes::from(boc); + c.bench_function("big-message", |b| { + b.iter(|| { + let _ = ExtMsgRepr::validate(bytes.clone()); + }); + }); +} + +criterion_group!(benches, big_message_benchmark); +criterion_main!(benches); diff --git a/block-util/benches/common.rs b/block-util/benches/common.rs new file mode 100644 index 000000000..3510467b4 --- /dev/null +++ b/block-util/benches/common.rs @@ -0,0 +1,36 @@ +use everscale_types::cell::{Cell, CellBuilder, CellSliceRange}; +use everscale_types::models::{MsgInfo, OwnedMessage}; +use tycho_block_util::message::ExtMsgRepr; + +pub fn create_big_message() -> anyhow::Result { + let mut count = 0; + let body = make_big_tree(8, &mut count, ExtMsgRepr::MAX_MSG_CELLS as u16 - 100); + + let body_range = CellSliceRange::full(body.as_ref()); + + let cell = CellBuilder::build_from(OwnedMessage { + info: MsgInfo::ExtIn(Default::default()), + init: None, + body: (body, body_range), + layout: None, + })?; + + Ok(cell) +} + +fn make_big_tree(depth: u8, count: &mut u16, target: u16) -> Cell { + *count += 1; + + if depth == 0 { + CellBuilder::build_from(*count).unwrap() + } else { + let mut b = CellBuilder::new(); + for _ in 0..4 { + if *count < target { + b.store_reference(make_big_tree(depth - 1, count, target)) + .unwrap(); + } + } + b.build().unwrap() + } +} diff --git a/block-util/benches/decode_message.rs b/block-util/benches/decode_message.rs new file mode 100644 index 000000000..f8b333ad8 --- /dev/null +++ b/block-util/benches/decode_message.rs @@ -0,0 +1,52 @@ +use base64::prelude::Engine as _; +use criterion::{criterion_group, criterion_main, Criterion}; +use everscale_types::boc::Boc; +use everscale_types::cell::{Cell, CellTreeStats}; +use everscale_types::models::MsgInfo; +use everscale_types::prelude::{CellFamily, Load}; +use tycho_block_util::message::MsgStorageStat; + +use self::common::create_big_message; + +mod common; + +fn decode_benchmark(c: &mut Criterion) { + let boc = { + let cell = create_big_message().unwrap(); + Boc::encode_base64(&cell) + }; + + fn decode_base64_impl(data: &[u8]) -> Result, base64::DecodeError> { + base64::engine::general_purpose::STANDARD.decode(data) + } + + c.bench_function("decode-base64", |b| { + b.iter(|| decode_base64_impl(boc.as_ref()).unwrap()); + }); + + let x = decode_base64_impl(boc.as_ref()).unwrap(); + + c.bench_function("boc-decode-ext-base64", |b| { + b.iter(|| Boc::decode_ext(x.as_ref(), Cell::empty_context())); + }); + + let result = Boc::decode_ext(x.as_ref(), Cell::empty_context()).unwrap(); + + c.bench_function("owned-message-load", |b| { + b.iter(|| MsgInfo::load_from(&mut result.as_slice().unwrap()).unwrap()); + }); + let cs = &mut result.as_slice().unwrap(); + let _ = MsgInfo::load_from(cs).unwrap(); + + c.bench_function("traverse", |b| { + b.iter(|| { + MsgStorageStat::check_slice(cs, 2, CellTreeStats { + bit_count: 1 << 21, + cell_count: 1 << 13, + }) + }); + }); +} + +criterion_group!(benches, decode_benchmark); +criterion_main!(benches); diff --git a/block-util/src/block/block_proof_stuff.rs b/block-util/src/block/block_proof_stuff.rs index 04787a52a..6375d0fb4 100644 --- a/block-util/src/block/block_proof_stuff.rs +++ b/block-util/src/block/block_proof_stuff.rs @@ -20,16 +20,6 @@ pub struct BlockProofStuff { impl BlockProofStuff { #[cfg(any(test, feature = "test"))] pub fn new_empty(block_id: &BlockId) -> Self { - use everscale_types::merkle::MerkleUpdate; - - struct AlwaysInclude; - - impl MerkleFilter for AlwaysInclude { - fn check(&self, _: &HashBytes) -> FilterAction { - FilterAction::Include - } - } - let block_info = BlockInfo { shard: block_id.shard, seqno: block_id.seqno, @@ -240,7 +230,7 @@ impl BlockProofStuff { anyhow::ensure!( &signatures.consensus_info == mc_consensus_info, - "block consensus info does not match master state consensus info (found: {:?}, expected: {:?}", + "block consensus info does not match master state consensus info (found: {:?}, expected: {:?}", signatures.consensus_info, mc_consensus_info, ); @@ -549,3 +539,12 @@ impl ValidatorSubsetInfo { }) } } + +// TODO: Move into `types`. +pub struct AlwaysInclude; + +impl MerkleFilter for AlwaysInclude { + fn check(&self, _: &HashBytes) -> FilterAction { + FilterAction::Include + } +} diff --git a/block-util/src/block/mod.rs b/block-util/src/block/mod.rs index e44c25aa9..b7d5b5b26 100644 --- a/block-util/src/block/mod.rs +++ b/block-util/src/block/mod.rs @@ -1,7 +1,7 @@ pub use self::block_id_ext::{calc_next_block_id_short, BlockIdExt, BlockIdRelation}; pub use self::block_proof_stuff::{ - check_with_master_state, check_with_prev_key_block_proof, BlockProofStuff, BlockProofStuffAug, - ValidatorSubsetInfo, + check_with_master_state, check_with_prev_key_block_proof, AlwaysInclude, BlockProofStuff, + BlockProofStuffAug, ValidatorSubsetInfo, }; pub use self::block_stuff::{BlockStuff, BlockStuffAug}; pub use self::top_blocks::{ShardHeights, TopBlocks, TopBlocksShortIdsIter}; diff --git a/block-util/src/lib.rs b/block-util/src/lib.rs index 0ea3f82b7..0c1af5a5e 100644 --- a/block-util/src/lib.rs +++ b/block-util/src/lib.rs @@ -2,6 +2,7 @@ pub mod archive; pub mod block; pub mod config; pub mod dict; +pub mod message; pub mod queue; pub mod state; pub mod tl; diff --git a/util/src/bc/ext_msg_repr.rs b/block-util/src/message/ext_msg_repr.rs similarity index 72% rename from util/src/bc/ext_msg_repr.rs rename to block-util/src/message/ext_msg_repr.rs index 059c5c809..4b94e37c1 100644 --- a/util/src/bc/ext_msg_repr.rs +++ b/block-util/src/message/ext_msg_repr.rs @@ -1,15 +1,19 @@ use std::cell::RefCell; -use base64::prelude::{Engine as _, BASE64_STANDARD}; use everscale_types::cell::CellTreeStats; use everscale_types::error::Error; -use everscale_types::models::{IntAddr, MessageLayout, MsgInfo, OwnedMessage, StateInit}; +use everscale_types::models::{IntAddr, MsgInfo, StateInit}; use everscale_types::prelude::*; -use serde::de::Error as _; -use serde::{Deserialize, Deserializer, Serializer}; - -use crate::serde_helpers::BorrowedStr; -use crate::FastHashMap; +use tycho_util::FastHashMap; + +pub async fn validate_external_message(body: &bytes::Bytes) -> Result<(), InvalidExtMsg> { + if body.len() > ExtMsgRepr::BOUNDARY_BOC_SIZE { + let body = body.clone(); + tycho_util::sync::rayon_run_fifo(move || ExtMsgRepr::validate(&body)).await + } else { + ExtMsgRepr::validate(body) + } +} pub struct ExtMsgRepr; @@ -19,60 +23,20 @@ impl ExtMsgRepr { pub const MAX_ALLOWED_MERKLE_DEPTH: u8 = 2; pub const MAX_MSG_BITS: u64 = 1 << 21; pub const MAX_MSG_CELLS: u64 = 1 << 13; + pub const BOUNDARY_BOC_SIZE: usize = 1 << 12; pub const ALLOWED_WORKCHAINS: std::ops::RangeInclusive = -1..=0; - // === Serde methods === - - pub fn serialize(msg: &OwnedMessage, serializer: S) -> Result - where - S: Serializer, - { - BocRepr::serialize(msg, serializer) - } - - pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> - where - D: Deserializer<'de>, - { - let BorrowedStr(str) = as Deserialize>::deserialize(deserializer)?; - - let decoded_len = base64::decoded_len_estimate(str.len()); - if decoded_len > Self::MAX_BOC_SIZE { - return Err(D::Error::invalid_length(decoded_len, &"less than 64 kB")); - } - - let bytes = BASE64_STANDARD - .decode(str.as_ref()) - .map_err(D::Error::custom)?; - drop(str); - - Self::decode(bytes).map_err(D::Error::custom) - } - // === General methods === - pub fn decode_base64>(base64: T) -> anyhow::Result> { - let decoded_len = base64::decoded_len_estimate(base64.as_ref().len()); - anyhow::ensure!( - decoded_len <= Self::MAX_BOC_SIZE, - InvalidExtMsg::BocSizeExceeded - ); - - let bytes = BASE64_STANDARD.decode(base64.as_ref())?; - drop(base64); - - Self::decode(bytes).map_err(Into::into) - } - - pub fn decode>(bytes: T) -> Result, InvalidExtMsg> { + pub fn validate>(bytes: T) -> Result<(), InvalidExtMsg> { // Apply limits to the encoded BOC. if bytes.as_ref().len() > Self::MAX_BOC_SIZE { return Err(InvalidExtMsg::BocSizeExceeded); } // Decode BOC. - let msg_root = Boc::decode(bytes)?; + let msg_root = Self::boc_decode_with_limit(bytes.as_ref(), Self::MAX_MSG_CELLS)?; // Cell must not contain any suspicious pruned branches not wrapped into merkle stuff. if msg_root.level() != 0 { @@ -121,48 +85,48 @@ impl ExtMsgRepr { } // Process message state init. - let mut layout = MessageLayout { - init_to_cell: false, - body_to_cell: false, - }; - let init = if cs.load_bit()? { - Some(if cs.load_bit()? { + if cs.load_bit()? { + if cs.load_bit()? { // State init as reference. - layout.init_to_cell = true; - cs.load_reference().and_then(|c| c.parse::()) + cs.load_reference().and_then(|c| c.parse::())?; } else { // Inline state init. - StateInit::load_from(&mut cs) - }?) - } else { - None - }; + StateInit::load_from(&mut cs)?; + } + } // Process message body. - let body = if cs.load_bit()? { - // Body as cell. - layout.body_to_cell = true; - let body_cell = cs.load_reference_cloned()?; - - // Message must not contain anything else. - if !cs.is_empty() { + if cs.load_bit()? { + // Message must not contain anything other than body as cell. + if !cs.is_data_empty() || cs.size_refs() != 1 { return Err(InvalidExtMsg::InvalidMessage(Error::InvalidData)); } + } - let body_range = CellSliceRange::full(body_cell.as_ref()); - (body_cell, body_range) - } else { - // Inline body. - let body_range = cs.range(); - (msg_root, body_range) - }; + Ok(()) + } + + fn boc_decode_with_limit(data: &[u8], max_cells: u64) -> Result { + use everscale_types::boc::de::{self, Options}; + + let header = everscale_types::boc::de::BocHeader::decode(data, &Options { + max_roots: Some(1), + min_roots: Some(1), + })?; - Ok(Box::new(OwnedMessage { - info, - init, - body, - layout: Some(layout), - })) + // Optimistic check based on just cell data ranges. + if header.cells().len() as u64 > max_cells { + return Err(InvalidExtMsg::MsgSizeExceeded); + } + + if let Some(&root) = header.roots().first() { + let cells = header.finalize(Cell::empty_context())?; + if let Some(root) = cells.get(root) { + return Ok(root); + } + } + + Err(InvalidExtMsg::BocError(de::Error::RootCellNotFound)) } } @@ -182,7 +146,7 @@ pub enum InvalidExtMsg { MsgSizeExceeded, } -struct MsgStorageStat<'a> { +pub struct MsgStorageStat<'a> { visited: &'a mut FastHashMap<&'static HashBytes, u8>, limits: CellTreeStats, max_merkle_depth: u8, @@ -198,7 +162,7 @@ impl<'a> MsgStorageStat<'a> { ); } - fn check_slice<'c: 'a>( + pub fn check_slice<'c: 'a>( cs: &CellSlice<'c>, max_merkle_depth: u8, limits: CellTreeStats, @@ -273,17 +237,18 @@ impl<'a> MsgStorageStat<'a> { #[cfg(test)] mod test { use everscale_types::error::Error; - use everscale_types::merkle::{FilterAction, MerkleFilter, MerkleProof}; - use everscale_types::models::{ExtOutMsgInfo, IntMsgInfo}; + use everscale_types::merkle::MerkleProof; + use everscale_types::models::{ExtOutMsgInfo, IntMsgInfo, MessageLayout, OwnedMessage}; use super::*; + use crate::block::AlwaysInclude; #[test] fn fits_into_limits() -> anyhow::Result<()> { #[track_caller] fn unwrap_msg(cell: Cell) { let boc = Boc::encode(cell); - ExtMsgRepr::decode(boc).unwrap(); + ExtMsgRepr::validate(boc).unwrap(); } // Simple message. @@ -340,7 +305,7 @@ mod test { #[track_caller] fn expect_err(cell: Cell) -> InvalidExtMsg { let boc = Boc::encode(cell); - ExtMsgRepr::decode(boc).unwrap_err() + ExtMsgRepr::validate(boc).unwrap_err() } // Garbage. @@ -437,49 +402,60 @@ mod test { // Too big message. { - let mut count = 0; - let body = make_big_tree(8, &mut count, ExtMsgRepr::MAX_MSG_CELLS as u16 + 10); - let body_range = CellSliceRange::full(body.as_ref()); - - let cell = CellBuilder::build_from(OwnedMessage { - info: MsgInfo::ExtIn(Default::default()), - init: None, - body: (body, body_range), - layout: None, - })?; - + let cell = exceed_big_message()?; assert!(matches!(expect_err(cell), InvalidExtMsg::MsgSizeExceeded)); } // Too big merkle depth. { - let leaf_proof = MerkleProof::create(Cell::empty_cell_ref(), AlwaysInclude) - .build() - .and_then(CellBuilder::build_from)?; + let cell = create_deep_merkle()?; + assert!(matches!(expect_err(cell), InvalidExtMsg::MsgSizeExceeded)); + } - let inner_proof = MerkleProof::create(leaf_proof.as_ref(), AlwaysInclude) - .build() - .and_then(CellBuilder::build_from)?; + Ok(()) + } - let body = MerkleProof::create(inner_proof.as_ref(), AlwaysInclude) - .build() - .and_then(CellBuilder::build_from)?; - let body_range = CellSliceRange::full(body.as_ref()); + fn exceed_big_message() -> anyhow::Result { + let mut count = 0; + let body = make_big_tree(8, &mut count, ExtMsgRepr::MAX_MSG_CELLS as u16 + 100); - let cell = CellBuilder::build_from(OwnedMessage { - info: MsgInfo::ExtIn(Default::default()), - init: None, - body: (body, body_range), - layout: Some(MessageLayout { - body_to_cell: true, - init_to_cell: false, - }), - })?; + let body_range = CellSliceRange::full(body.as_ref()); - assert!(matches!(expect_err(cell), InvalidExtMsg::MsgSizeExceeded)); - } + let cell = CellBuilder::build_from(OwnedMessage { + info: MsgInfo::ExtIn(Default::default()), + init: None, + body: (body, body_range), + layout: None, + })?; - Ok(()) + Ok(cell) + } + + fn create_deep_merkle() -> anyhow::Result { + let leaf_proof = MerkleProof::create(Cell::empty_cell_ref(), AlwaysInclude) + .build() + .and_then(CellBuilder::build_from)?; + + let inner_proof = MerkleProof::create(leaf_proof.as_ref(), AlwaysInclude) + .build() + .and_then(CellBuilder::build_from)?; + + let body = MerkleProof::create(inner_proof.as_ref(), AlwaysInclude) + .build() + .and_then(CellBuilder::build_from)?; + let body_range = CellSliceRange::full(body.as_ref()); + + let cell = CellBuilder::build_from(OwnedMessage { + info: MsgInfo::ExtIn(Default::default()), + init: None, + body: (body, body_range), + layout: Some(MessageLayout { + body_to_cell: true, + init_to_cell: false, + }), + })?; + + Ok(cell) } fn make_big_tree(depth: u8, count: &mut u16, target: u16) -> Cell { @@ -498,12 +474,4 @@ mod test { b.build().unwrap() } } - - struct AlwaysInclude; - - impl MerkleFilter for AlwaysInclude { - fn check(&self, _: &HashBytes) -> FilterAction { - FilterAction::Include - } - } } diff --git a/block-util/src/message/mod.rs b/block-util/src/message/mod.rs new file mode 100644 index 000000000..f8baf75db --- /dev/null +++ b/block-util/src/message/mod.rs @@ -0,0 +1,5 @@ +pub use self::ext_msg_repr::{ + validate_external_message, ExtMsgRepr, InvalidExtMsg, MsgStorageStat, +}; + +mod ext_msg_repr; diff --git a/collator/src/mempool/impls/std_impl/parser.rs b/collator/src/mempool/impls/std_impl/parser.rs index 94f27ad6c..faf77c51c 100644 --- a/collator/src/mempool/impls/std_impl/parser.rs +++ b/collator/src/mempool/impls/std_impl/parser.rs @@ -5,7 +5,7 @@ use everscale_types::boc::Boc; use everscale_types::models::MsgInfo; use everscale_types::prelude::Load; use rayon::iter::{IntoParallelIterator, ParallelIterator}; -use tycho_util::bc::ExtMsgRepr; +use tycho_block_util::message::ExtMsgRepr; use tycho_util::metrics::HistogramGuard; use crate::mempool::impls::std_impl::deduplicator::Deduplicator; diff --git a/core/src/blockchain_rpc/service.rs b/core/src/blockchain_rpc/service.rs index 4b1d8e2df..6254688b4 100644 --- a/core/src/blockchain_rpc/service.rs +++ b/core/src/blockchain_rpc/service.rs @@ -6,6 +6,7 @@ use bytes::{Buf, Bytes}; use everscale_types::models::BlockId; use futures_util::Future; use serde::{Deserialize, Serialize}; +use tycho_block_util::message::validate_external_message; use tycho_network::{try_handle_prefix, InboundRequestMeta, Response, Service, ServiceRequest}; use tycho_storage::{ArchiveId, BlockConnection, KeyBlocksDirection, PersistentStateKind, Storage}; use tycho_util::futures::BoxFutureOrNoop; @@ -325,6 +326,11 @@ impl Service for BlockchainRpcService { metrics::counter!("tycho_rpc_broadcast_external_message_rx_bytes_total") .increment(req.body.len() as u64); BoxFutureOrNoop::future(async move { + if let Err(e) = validate_external_message(&req.body).await { + tracing::debug!("invalid external message: {e:?}"); + return; + } + inner .broadcast_listener .handle_message(req.metadata, req.body) diff --git a/core/tests/overlay_server.rs b/core/tests/overlay_server.rs index 1fe09c38b..ba9aaacd0 100644 --- a/core/tests/overlay_server.rs +++ b/core/tests/overlay_server.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Result; -use everscale_types::boc::Boc; -use everscale_types::models::{BlockId, ShardIdent}; +use everscale_types::boc::{Boc, BocRepr}; +use everscale_types::models::{BlockId, ExtInMsgInfo, OwnedMessage, ShardIdent}; use tycho_block_util::block::{BlockProofStuff, BlockStuff}; use tycho_block_util::queue::QueueDiffStuff; use tycho_block_util::state::ShardStateStuff; @@ -134,9 +134,16 @@ async fn overlay_server_msg_broadcast() -> Result<()> { tokio::time::sleep(Duration::from_secs(1)).await; tracing::info!("broadcasting messages..."); + let msg = BocRepr::encode(OwnedMessage { + info: ExtInMsgInfo::default().into(), + init: None, + body: Default::default(), + layout: None, + })?; + for node in &nodes { node.blockchain_client - .broadcast_external_message(b"hello world") + .broadcast_external_message(&msg) .await; } diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index df09bcd18..5a97ace3c 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -33,7 +33,7 @@ tracing = { workspace = true } tycho-block-util = { workspace = true } tycho-core = { workspace = true } tycho-storage = { workspace = true } -tycho-util = { workspace = true, features = ["bc"] } +tycho-util = { workspace = true } [dev-dependencies] tempfile = { workspace = true } diff --git a/rpc/src/endpoint/jrpc/mod.rs b/rpc/src/endpoint/jrpc/mod.rs index 31f9a4664..481fd2c59 100644 --- a/rpc/src/endpoint/jrpc/mod.rs +++ b/rpc/src/endpoint/jrpc/mod.rs @@ -9,9 +9,10 @@ use everscale_types::models::*; use everscale_types::prelude::*; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; +use tycho_block_util::message::{validate_external_message, ExtMsgRepr}; use tycho_storage::{CodeHashesIter, TransactionsIterBuilder}; use tycho_util::metrics::HistogramGuard; -use tycho_util::{bc, serde_helpers}; +use tycho_util::serde_helpers::{self, Base64BytesWithLimit}; pub use self::cache::JrpcEndpointCache; use self::extractor::{declare_jrpc_method, Jrpc, JrpcErrorResponse, JrpcOkResponse}; @@ -66,15 +67,16 @@ pub async fn route(State(state): State, req: Jrpc) -> Response } } MethodParams::SendMessage(p) => { - let Ok(data) = BocRepr::encode(p.message) else { + if let Err(e) = validate_external_message(&p.message).await { return JrpcErrorResponse { id: Some(req.id), code: INVALID_BOC_CODE, - message: Cow::Borrowed("invalid message BOC"), + message: e.to_string().into(), } .into_response(); - }; - state.broadcast_external_message(&data).await; + } + + state.broadcast_external_message(&p.message).await; ok_to_response(req.id, ()) } MethodParams::GetLibraryCell(p) => { @@ -198,8 +200,8 @@ impl<'de> Deserialize<'de> for EmptyParams { #[derive(Debug, Deserialize)] pub struct SendMessageRequest { - #[serde(with = "bc::ExtMsgRepr")] - pub message: Box, + #[serde(with = "Base64BytesWithLimit::<{ ExtMsgRepr::MAX_BOC_SIZE }>")] + pub message: bytes::Bytes, } #[derive(Debug, Deserialize)] diff --git a/rpc/src/endpoint/proto/mod.rs b/rpc/src/endpoint/proto/mod.rs index f16d035da..9e3129db3 100644 --- a/rpc/src/endpoint/proto/mod.rs +++ b/rpc/src/endpoint/proto/mod.rs @@ -8,7 +8,7 @@ use bytes::Bytes; use everscale_types::cell::HashBytes; use everscale_types::models::*; use everscale_types::prelude::*; -use tycho_util::bc::ExtMsgRepr; +use tycho_block_util::message::validate_external_message; pub use self::cache::ProtoEndpointCache; use self::protos::rpc::{self, request, response, Request}; @@ -66,13 +66,14 @@ pub async fn route(State(state): State, Protobuf(req): Protobuf { - if let Err(e) = ExtMsgRepr::decode(&p.message) { + if let Err(e) = validate_external_message(&p.message).await { return ProtoErrorResponse { code: INVALID_BOC_CODE, message: e.to_string().into(), } .into_response(); - }; + } + state.broadcast_external_message(&p.message).await; ok_to_response(response::Result::SendMessage(())) } diff --git a/util/Cargo.toml b/util/Cargo.toml index 9ccd7dab9..611d648f7 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -37,13 +37,11 @@ tracing-subscriber = { workspace = true, optional = true } zstd-safe = { workspace = true, features = ["std", "zstdmt"] } zstd-sys = { workspace = true } -everscale-types = { workspace = true, optional = true } metrics-exporter-prometheus = { workspace = true, optional = true } tikv-jemalloc-ctl = { workspace = true, optional = true } [dev-dependencies] criterion = "0.5.1" -everscale-types = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true, features = ["time", "sync", "rt-multi-thread", "macros"] } tracing-subscriber = { workspace = true, features = ["env-filter"] } @@ -60,7 +58,6 @@ cli = [ "metrics-exporter-prometheus", "tikv-jemalloc-ctl", ] -bc = ["dep:everscale-types"] [[bench]] name = "p2" diff --git a/util/src/lib.rs b/util/src/lib.rs index dd91558bd..2409d44b6 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -10,13 +10,6 @@ pub mod time; pub mod tl; -#[cfg(any(test, feature = "bc"))] -pub mod bc { - pub use ext_msg_repr::ExtMsgRepr; - - mod ext_msg_repr; -} - pub mod futures { pub use self::box_future_or_noop::BoxFutureOrNoop; pub use self::join_task::JoinTask; diff --git a/util/src/serde_helpers.rs b/util/src/serde_helpers.rs index 0fc953edd..7af2a6bc2 100644 --- a/util/src/serde_helpers.rs +++ b/util/src/serde_helpers.rs @@ -4,6 +4,8 @@ use std::path::Path; use std::str::FromStr; use anyhow::Result; +use base64::prelude::{Engine as _, BASE64_STANDARD}; +use bytes::Bytes; use serde::de::{Error, Expected, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -190,6 +192,96 @@ pub mod humantime { } } +pub struct Base64BytesWithLimit; + +impl Base64BytesWithLimit { + pub fn serialize(value: &[u8], serializer: S) -> Result + where + S: serde::Serializer, + { + if serializer.is_human_readable() { + let base64 = BASE64_STANDARD.encode(value); + serializer.serialize_str(&base64) + } else { + serializer.serialize_bytes(value) + } + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct BytesVisitorWithLimit; + + impl<'de, const LIMIT: usize> Visitor<'de> for BytesVisitorWithLimit { + type Value = Bytes; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("byte array") + } + + #[inline] + fn visit_seq(self, mut seq: V) -> Result + where + V: serde::de::SeqAccess<'de>, + { + 'valid: { + let hint = seq.size_hint().unwrap_or(0); + if hint > LIMIT { + break 'valid; + } + + let len = std::cmp::min(hint, 4096); + let mut values: Vec = Vec::with_capacity(len); + + while let Some(value) = seq.next_element()? { + if values.len() > LIMIT { + break 'valid; + } + + values.push(value); + } + + return Ok(Bytes::from(values)); + } + + Err(Error::custom("slice is too big")) + } + + #[inline] + fn visit_bytes(self, v: &[u8]) -> Result { + if v.len() > LIMIT { + return Err(Error::custom("slice is too big")); + } + Ok(Bytes::copy_from_slice(v)) + } + + #[inline] + fn visit_byte_buf(self, v: Vec) -> Result { + if v.len() > LIMIT { + return Err(Error::custom("slice is too big")); + } + Ok(Bytes::from(v)) + } + } + + if deserializer.is_human_readable() { + let BorrowedStr(s) = <_>::deserialize(deserializer)?; + if base64::decoded_len_estimate(s.len()) >= LIMIT { + return Err(Error::custom("slice is too big")); + } + + let v = BASE64_STANDARD + .decode(s.as_ref()) + .map_err(|_e| D::Error::custom("invalid base64"))?; + + Ok(Bytes::from(v)) + } else { + deserializer.deserialize_bytes(BytesVisitorWithLimit::) + } + } +} + pub mod string { use super::*;