diff --git a/Cargo.lock b/Cargo.lock index f8306353826..ac80980ad33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2671,7 +2671,6 @@ dependencies = [ "serde_json", "serial_test 2.0.0", "socket2", - "stream_limiter", "substruct", "tempfile", "thiserror", @@ -3795,7 +3794,7 @@ checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" [[package]] name = "peernet" version = "0.1.0" -source = "git+https://github.com/massalabs/PeerNet?rev=7b2a1a9#7b2a1a93e993df61b8b1288c2847be2d6488e805" +source = "git+https://github.com/massalabs/PeerNet?branch=deactivate_stream_limiter#df6149648cf994e1ff587a95cdaa486868932533" dependencies = [ "crossbeam", "enum_delegate", @@ -3805,7 +3804,6 @@ dependencies = [ "quiche", "rand 0.8.5", "serde", - "stream_limiter", "thiserror", ] @@ -5024,12 +5022,6 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e08d8363704e6c71fc928674353e6b7c23dcea9d82d7012c8faf2a3a025f8d0" -[[package]] -name = "stream_limiter" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89acdb288f530d175bbeb1344aab11cfa0e0ee5b4e63d75c204001568dcf90cf" - [[package]] name = "strsim" version = "0.8.0" diff --git a/massa-bootstrap/Cargo.toml b/massa-bootstrap/Cargo.toml index 52fa6d2a549..3c8a80f79d5 100644 --- a/massa-bootstrap/Cargo.toml +++ b/massa-bootstrap/Cargo.toml @@ -21,7 +21,6 @@ substruct = { git = "https://github.com/sydhds/substruct" } socket2 = "0.4.7" crossbeam = "0.8.2" mio = { version = "0.8", features = ["net", "os-poll"] } -stream_limiter = "3.2.0" # custom modules massa_consensus_exports = { path = "../massa-consensus-exports" } diff --git a/massa-bootstrap/src/bindings/client.rs b/massa-bootstrap/src/bindings/client.rs index 2bf7e029ac0..29703b7d315 100644 --- a/massa-bootstrap/src/bindings/client.rs +++ b/massa-bootstrap/src/bindings/client.rs @@ -18,12 +18,11 @@ use massa_signature::{PublicKey, Signature}; use rand::{rngs::StdRng, RngCore, SeedableRng}; use std::time::Instant; use std::{net::TcpStream, time::Duration}; -use stream_limiter::{Limiter, LimiterOptions}; /// Bootstrap client binder pub struct BootstrapClientBinder { remote_pubkey: PublicKey, - duplex: Limiter, + duplex: TcpStream, //Limiter, prev_message: Option, version_serializer: VersionSerializer, cfg: BootstrapClientConfig, @@ -47,11 +46,11 @@ impl BootstrapClientBinder { duplex: TcpStream, remote_pubkey: PublicKey, cfg: BootstrapClientConfig, - limit: Option, + _limit: Option, ) -> Self { - let limit_opts = - limit.map(|limit| LimiterOptions::new(limit, Duration::from_millis(1000), limit)); - let duplex = Limiter::new(duplex, limit_opts.clone(), limit_opts); + // let limit_opts = + // limit.map(|limit| LimiterOptions::new(limit, Duration::from_millis(1000), limit)); + // let duplex = Limiter::new(duplex, limit_opts.clone(), limit_opts); BootstrapClientBinder { remote_pubkey, duplex, @@ -215,10 +214,10 @@ impl BootstrapClientBinder { impl crate::bindings::BindingReadExact for BootstrapClientBinder { fn set_read_timeout(&mut self, duration: Option) -> Result<(), std::io::Error> { - if let Some(ref mut opts) = self.duplex.read_opt { - opts.timeout = duration; - } - self.duplex.stream.set_read_timeout(duration) + // if let Some(ref mut opts) = self.duplex.read_opt { + // opts.timeout = duration; + // } + self.duplex.set_read_timeout(duration) } } @@ -230,10 +229,10 @@ impl std::io::Read for BootstrapClientBinder { impl crate::bindings::BindingWriteExact for BootstrapClientBinder { fn set_write_timeout(&mut self, duration: Option) -> Result<(), std::io::Error> { - if let Some(ref mut opts) = self.duplex.write_opt { - opts.timeout = duration; - } - self.duplex.stream.set_write_timeout(duration) + // if let Some(ref mut opts) = self.duplex.write_opt { + // opts.timeout = duration; + // } + self.duplex.set_write_timeout(duration) } } diff --git a/massa-bootstrap/src/bindings/server.rs b/massa-bootstrap/src/bindings/server.rs index 0eec7ca83f7..597faed0087 100644 --- a/massa-bootstrap/src/bindings/server.rs +++ b/massa-bootstrap/src/bindings/server.rs @@ -24,7 +24,7 @@ use std::{ thread, time::Duration, }; -use stream_limiter::{Limiter, LimiterOptions}; +// use stream_limiter::{Limiter, LimiterOptions}; use tracing::error; use super::BindingWriteExact; @@ -43,7 +43,7 @@ pub struct BootstrapServerBinder { max_datastore_key_length: u8, randomness_size_bytes: usize, local_keypair: KeyPair, - duplex: Limiter, + duplex: TcpStream, //Limiter, prev_message: Option, version_serializer: VersionSerializer, version_deserializer: VersionDeserializer, @@ -62,7 +62,7 @@ impl BootstrapServerBinder { duplex: TcpStream, local_keypair: KeyPair, cfg: BootstrapSrvBindCfg, - rw_limit: Option, + _rw_limit: Option, ) -> Self { let BootstrapSrvBindCfg { max_bytes_read_write: _limit, @@ -73,10 +73,10 @@ impl BootstrapServerBinder { write_error_timeout, } = cfg; - let limit_opts = rw_limit.map(|limit| -> LimiterOptions { - LimiterOptions::new(limit, Duration::from_millis(1000), limit) - }); - let duplex = Limiter::new(duplex, limit_opts.clone(), limit_opts); + // let limit_opts = rw_limit.map(|limit| -> LimiterOptions { + // LimiterOptions::new(limit, Duration::from_millis(1000), limit) + // }); + // let duplex = Limiter::new(duplex, limit_opts.clone(), limit_opts); BootstrapServerBinder { max_consensus_block_ids: consensus_bootstrap_part_size, local_keypair, @@ -326,7 +326,7 @@ impl io::Read for BootstrapServerBinder { impl crate::bindings::BindingReadExact for BootstrapServerBinder { fn set_read_timeout(&mut self, duration: Option) -> Result<(), std::io::Error> { - self.duplex.stream.set_read_timeout(duration) + self.duplex.set_read_timeout(duration) } } @@ -342,6 +342,6 @@ impl io::Write for BootstrapServerBinder { impl crate::bindings::BindingWriteExact for BootstrapServerBinder { fn set_write_timeout(&mut self, duration: Option) -> Result<(), std::io::Error> { - self.duplex.stream.set_write_timeout(duration) + self.duplex.set_write_timeout(duration) } } diff --git a/massa-bootstrap/src/tests/binders.rs b/massa-bootstrap/src/tests/binders.rs index 05e87f770fe..dc5dd57b830 100644 --- a/massa-bootstrap/src/tests/binders.rs +++ b/massa-bootstrap/src/tests/binders.rs @@ -715,7 +715,7 @@ fn test_bandwidth() { _ => panic!("Bad message receive: Expected a peers list message"), } let dur = before.elapsed(); - assert!(dur > Duration::from_secs(9)); + //assert!(dur > Duration::from_secs(9)); assert!(dur < Duration::from_millis(millis_limit)); std::thread::sleep(Duration::from_secs(1)); @@ -727,7 +727,7 @@ fn test_bandwidth() { ) .unwrap(); let dur = before.elapsed(); - assert!(dur > Duration::from_secs(9), "{dur:?}"); + //assert!(dur > Duration::from_secs(9), "{dur:?}"); assert!(dur < Duration::from_millis(millis_limit)); } }) @@ -752,7 +752,7 @@ fn test_bandwidth() { ) .unwrap(); let dur = before.elapsed(); - assert!(dbg!(dur) > Duration::from_secs(9), "{dur:?}"); + //assert!(dbg!(dur) > Duration::from_secs(9), "{dur:?}"); assert!(dur < Duration::from_millis(millis_limit)); std::thread::sleep(Duration::from_secs(1)); @@ -765,7 +765,7 @@ fn test_bandwidth() { _ => panic!("Bad message receive: Expected a peers list message"), } let dur = before.elapsed(); - assert!(dur > Duration::from_secs(9)); + //assert!(dur > Duration::from_secs(9)); assert!(dur < Duration::from_millis(millis_limit)); } }) diff --git a/massa-models/src/config/constants.rs b/massa-models/src/config/constants.rs index 80c86f51f22..1cb81bbe6d8 100644 --- a/massa-models/src/config/constants.rs +++ b/massa-models/src/config/constants.rs @@ -78,9 +78,9 @@ lazy_static::lazy_static! { /// node version pub static ref VERSION: Version = { if cfg!(feature = "sandbox") { - "SAND.24.0" + "SAND.24.1" } else { - "TEST.24.0" + "TEST.24.1" } .parse() .unwrap() diff --git a/massa-node/base_config/openrpc.json b/massa-node/base_config/openrpc.json index 6689b9bc149..0658b5cdd9e 100644 --- a/massa-node/base_config/openrpc.json +++ b/massa-node/base_config/openrpc.json @@ -2,7 +2,7 @@ "openrpc": "1.2.4", "info": { "title": "Massa OpenRPC Specification", - "version": "TEST.24.0", + "version": "TEST.24.1", "description": "Massa OpenRPC Specification document. Find more information on https://docs.massa.net/en/latest/technical-doc/api.html", "termsOfService": "https://open-rpc.org", "contact": { diff --git a/massa-node/src/main.rs b/massa-node/src/main.rs index 895ccec36f8..46f45e07ba7 100644 --- a/massa-node/src/main.rs +++ b/massa-node/src/main.rs @@ -620,6 +620,7 @@ async fn launch( try_connection_timer: SETTINGS.protocol.try_connection_timer, max_in_connections: SETTINGS.protocol.max_in_connections, timeout_connection: SETTINGS.protocol.timeout_connection, + message_timeout: SETTINGS.protocol.message_timeout, routable_ip: SETTINGS .protocol .routable_ip diff --git a/massa-node/src/settings.rs b/massa-node/src/settings.rs index 1ffb857bd6c..7b0364613da 100644 --- a/massa-node/src/settings.rs +++ b/massa-node/src/settings.rs @@ -245,6 +245,8 @@ pub struct ProtocolSettings { pub try_connection_timer: MassaTime, /// Timeout connection pub timeout_connection: MassaTime, + /// Message timeout + pub message_timeout: MassaTime, /// Nb in connections pub max_in_connections: usize, /// Peers limits per category diff --git a/massa-protocol-exports/Cargo.toml b/massa-protocol-exports/Cargo.toml index 25a24b6252e..7f7ed203ef2 100644 --- a/massa-protocol-exports/Cargo.toml +++ b/massa-protocol-exports/Cargo.toml @@ -11,7 +11,7 @@ nom = "=7.1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" # TODO tag peernet version -peernet = { git = "https://github.com/massalabs/PeerNet", rev = "7b2a1a9" } +peernet = { git = "https://github.com/massalabs/PeerNet", branch = "deactivate_stream_limiter" } #rev = "7b2a1a9" } tempfile = { version = "3.3", optional = true } # use with testing feature mockall = "0.11.4" diff --git a/massa-protocol-exports/src/settings.rs b/massa-protocol-exports/src/settings.rs index 383560375d3..87f07a20367 100644 --- a/massa-protocol-exports/src/settings.rs +++ b/massa-protocol-exports/src/settings.rs @@ -148,6 +148,8 @@ pub struct ProtocolConfig { pub max_in_connections: usize, /// Timeout connection pub timeout_connection: MassaTime, + /// Timeout message + pub message_timeout: MassaTime, /// Number of bytes per second that can be read/write in a connection (should be a 10 multiplier) pub read_write_limit_bytes_per_second: u128, /// Optional routable ip diff --git a/massa-protocol-exports/src/test_exports/config.rs b/massa-protocol-exports/src/test_exports/config.rs index 925ca90d633..8370b89c7ad 100644 --- a/massa-protocol-exports/src/test_exports/config.rs +++ b/massa-protocol-exports/src/test_exports/config.rs @@ -72,6 +72,7 @@ impl Default for ProtocolConfig { max_endorsements_per_message: 1000, max_size_listeners_per_peer: 100, max_size_peers_announcement: 100, + message_timeout: MassaTime::from_millis(10000), last_start_period: 0, read_write_limit_bytes_per_second: 1024 * 1000, timeout_connection: MassaTime::from_millis(1000), diff --git a/massa-protocol-worker/Cargo.toml b/massa-protocol-worker/Cargo.toml index 76bfbf8856b..ff588d21dac 100644 --- a/massa-protocol-worker/Cargo.toml +++ b/massa-protocol-worker/Cargo.toml @@ -13,7 +13,7 @@ serde_json = "1.0" nom = "=7.1" num_enum = "0.5" # TODO tag peernet version -peernet = { git = "https://github.com/massalabs/PeerNet", rev = "7b2a1a9" } +peernet = { git = "https://github.com/massalabs/PeerNet", branch = "deactivate_stream_limiter" } #rev = "7b2a1a9" } tempfile = { version = "3.3", optional = true } # use with testing feature rayon = "1.7.0" schnellru = "0.2.1" diff --git a/massa-protocol-worker/src/handlers/block_handler/retrieval.rs b/massa-protocol-worker/src/handlers/block_handler/retrieval.rs index c6033655597..f66312a5f94 100644 --- a/massa-protocol-worker/src/handlers/block_handler/retrieval.rs +++ b/massa-protocol-worker/src/handlers/block_handler/retrieval.rs @@ -1010,12 +1010,12 @@ impl RetrievalThread { } return Ok(()); }; - operations.retain(|op| block_operation_ids.contains(&op.id)); + let block_ids_set: PreHashSet = + block_operation_ids.iter().copied().collect(); + operations.retain(|op| block_ids_set.contains(&op.id)); // add operations to local storage and claim ref info.storage.store_operations(operations); - let block_ids_set = block_operation_ids.clone().into_iter().collect(); - let known_operations = info.storage.claim_operation_refs(&block_ids_set); - + let known_operations = info.storage.get_op_refs(); // Ban the node if: // - mismatch with asked operations (asked operations are the one that are not in storage) + operations already in storage and block operations // - full operations serialized size overflow @@ -1035,7 +1035,7 @@ impl RetrievalThread { self.consensus_controller .mark_invalid_block(block_id, header); } else { - if known_operations != block_ids_set { + if known_operations != &block_ids_set { warn!( "Peer id {} didn't sent us all the full operations for block id {}.", from_peer_id, block_id diff --git a/massa-protocol-worker/src/worker.rs b/massa-protocol-worker/src/worker.rs index 207e4a19d27..71a23cd016e 100644 --- a/massa-protocol-worker/src/worker.rs +++ b/massa-protocol-worker/src/worker.rs @@ -245,6 +245,8 @@ pub fn start_protocol_controller( our_keypair: keypair.clone(), }, ); + peernet_config.write_timeout = config.message_timeout.to_duration(); + peernet_config.read_timeout = config.message_timeout.to_duration(); let initial_peers_infos = serde_json::from_str::>( &std::fs::read_to_string(&config.initial_peers)?,