From 568b295f749f354f069cf8b1e81aa3c1685c6a03 Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Fri, 26 Apr 2024 21:18:16 +0200 Subject: [PATCH] refactor(overlay-client): simplify overlay client --- core/src/block_strider/provider.rs | 16 +- core/src/blockchain_client/mod.rs | 131 ++++---- core/src/overlay_client/config.rs | 49 +++ core/src/overlay_client/mod.rs | 317 ++++++++++++++++-- core/src/overlay_client/neighbour.rs | 67 ++-- core/src/overlay_client/neighbours.rs | 17 +- .../overlay_client/public_overlay_client.rs | 250 -------------- core/src/overlay_client/settings.rs | 39 --- core/tests/block_strider.rs | 8 +- core/tests/overlay_client.rs | 53 ++- core/tests/overlay_server.rs | 23 +- 11 files changed, 495 insertions(+), 475 deletions(-) create mode 100644 core/src/overlay_client/config.rs delete mode 100644 core/src/overlay_client/public_overlay_client.rs delete mode 100644 core/src/overlay_client/settings.rs diff --git a/core/src/block_strider/provider.rs b/core/src/block_strider/provider.rs index 7ac728d67..1b77a1a70 100644 --- a/core/src/block_strider/provider.rs +++ b/core/src/block_strider/provider.rs @@ -91,25 +91,25 @@ impl BlockProvider for BlockchainClient { let config = self.config(); loop { - let res = self.get_next_block_full(*prev_block_id).await; + let res = self.get_next_block_full(prev_block_id).await; let block = match res { Ok(res) if matches!(res.data(), BlockFull::Found { .. }) => { let (block_id, data) = match res.data() { BlockFull::Found { block_id, block, .. - } => (*block_id, block), + } => (*block_id, block.clone()), BlockFull::Empty => unreachable!(), }; - match BlockStuff::deserialize_checked(block_id, data) { + match BlockStuff::deserialize_checked(block_id, &data) { Ok(block) => { - res.mark_response(true); - Some(Ok(BlockStuffAug::new(block, data.clone()))) + res.accept(); + Some(Ok(BlockStuffAug::new(block, data))) } Err(e) => { tracing::error!("failed to deserialize block: {:?}", e); - res.mark_response(false); + res.reject(); None } } @@ -135,7 +135,7 @@ impl BlockProvider for BlockchainClient { let config = self.config(); loop { - let res = match self.get_block_full(*block_id).await { + let res = match self.get_block_full(block_id).await { Ok(res) => res, Err(e) => { tracing::error!("failed to get block: {:?}", e); @@ -152,7 +152,7 @@ impl BlockProvider for BlockchainClient { } => match BlockStuff::deserialize_checked(*block_id, data) { Ok(block) => Some(Ok(BlockStuffAug::new(block, data.clone()))), Err(e) => { - res.mark_response(false); + res.accept(); tracing::error!("failed to deserialize block: {:?}", e); tokio::time::sleep(config.get_block_polling_interval).await; continue; diff --git a/core/src/blockchain_client/mod.rs b/core/src/blockchain_client/mod.rs index c70c0aab0..06af585e1 100644 --- a/core/src/blockchain_client/mod.rs +++ b/core/src/blockchain_client/mod.rs @@ -4,63 +4,94 @@ use std::time::Duration; use anyhow::Result; use everscale_types::models::BlockId; -use crate::overlay_client::public_overlay_client::*; +use crate::overlay_client::{PublicOverlayClient, QueryResponse}; use crate::proto::overlay::rpc::*; use crate::proto::overlay::*; +pub struct BlockchainClientConfig { + pub get_next_block_polling_interval: Duration, + pub get_block_polling_interval: Duration, +} + +impl Default for BlockchainClientConfig { + fn default() -> Self { + Self { + get_block_polling_interval: Duration::from_millis(50), + get_next_block_polling_interval: Duration::from_millis(50), + } + } +} + +#[derive(Clone)] +#[repr(transparent)] pub struct BlockchainClient { - client: PublicOverlayClient, + inner: Arc, +} + +struct Inner { + overlay_client: PublicOverlayClient, config: BlockchainClientConfig, } impl BlockchainClient { - pub fn new( - overlay_client: PublicOverlayClient, - config: BlockchainClientConfig, - ) -> Arc { - Arc::new(Self { - client: overlay_client, - config, - }) + pub fn new(overlay_client: PublicOverlayClient, config: BlockchainClientConfig) -> Self { + Self { + inner: Arc::new(Inner { + overlay_client, + config, + }), + } + } + + pub fn overlay_client(&self) -> &PublicOverlayClient { + &self.inner.overlay_client + } + + pub fn config(&self) -> &BlockchainClientConfig { + &self.inner.config } pub async fn get_next_key_block_ids( &self, - block: BlockId, + block: &BlockId, max_size: u32, - ) -> Result> { - let data = self - .client - .query::(GetNextKeyBlockIds { block, max_size }) + ) -> Result> { + let client = &self.inner.overlay_client; + let data = client + .query::<_, KeyBlockIds>(&GetNextKeyBlockIds { + block: *block, + max_size, + }) .await?; Ok(data) } - pub async fn get_block_full(&self, block: BlockId) -> Result> { - let data = self - .client - .query::(GetBlockFull { block }) + pub async fn get_block_full(&self, block: &BlockId) -> Result> { + let client = &self.inner.overlay_client; + let data = client + .query::<_, BlockFull>(GetBlockFull { block: *block }) .await?; Ok(data) } pub async fn get_next_block_full( &self, - prev_block: BlockId, - ) -> Result> { - let data = self - .client - .query::(GetNextBlockFull { prev_block }) + prev_block: &BlockId, + ) -> Result> { + let client = &self.inner.overlay_client; + let data = client + .query::<_, BlockFull>(GetNextBlockFull { + prev_block: *prev_block, + }) .await?; Ok(data) } - pub async fn get_archive_info(&self, mc_seqno: u32) -> Result> { - let data = self - .client - .query::(GetArchiveInfo { mc_seqno }) + pub async fn get_archive_info(&self, mc_seqno: u32) -> Result> { + let client = &self.inner.overlay_client; + let data = client + .query::<_, ArchiveInfo>(GetArchiveInfo { mc_seqno }) .await?; - Ok(data) } @@ -69,10 +100,10 @@ impl BlockchainClient { archive_id: u64, offset: u64, max_size: u32, - ) -> Result> { - let data = self - .client - .query::(GetArchiveSlice { + ) -> Result> { + let client = &self.inner.overlay_client; + let data = client + .query::<_, Data>(GetArchiveSlice { archive_id, offset, max_size, @@ -83,38 +114,20 @@ impl BlockchainClient { pub async fn get_persistent_state_part( &self, - mc_block: BlockId, - block: BlockId, + mc_block: &BlockId, + block: &BlockId, offset: u64, max_size: u64, - ) -> Result> { - let data = self - .client - .query::(GetPersistentStatePart { - block, - mc_block, + ) -> Result> { + let client = &self.inner.overlay_client; + let data = client + .query::<_, PersistentStatePart>(GetPersistentStatePart { + block: *block, + mc_block: *mc_block, offset, max_size, }) .await?; Ok(data) } - - pub fn config(&self) -> &BlockchainClientConfig { - &self.config - } -} - -pub struct BlockchainClientConfig { - pub get_next_block_polling_interval: Duration, - pub get_block_polling_interval: Duration, -} - -impl Default for BlockchainClientConfig { - fn default() -> Self { - Self { - get_block_polling_interval: Duration::from_millis(50), - get_next_block_polling_interval: Duration::from_millis(50), - } - } } diff --git a/core/src/overlay_client/config.rs b/core/src/overlay_client/config.rs new file mode 100644 index 000000000..ee0718f7f --- /dev/null +++ b/core/src/overlay_client/config.rs @@ -0,0 +1,49 @@ +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use tycho_util::serde_helpers; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +#[non_exhaustive] +pub struct PublicOverlayClientConfig { + /// The interval at which neighbours list is updated. + /// + /// Default: 2 minutes. + #[serde(with = "serde_helpers::humantime")] + pub neighbours_update_interval: Duration, + + /// The interval at which current neighbours are pinged. + /// + /// Default: 30 seconds. + #[serde(with = "serde_helpers::humantime")] + pub neighbours_ping_interval: Duration, + + /// The maximum number of neighbours to keep. + /// + /// Default: 5. + pub max_neighbours: usize, + + /// The maximum number of ping tasks to run concurrently. + /// + /// Default: 5. + pub max_ping_tasks: usize, + + /// The default roundtrip time to use when a neighbour is added. + /// + /// Default: 300 ms. + #[serde(with = "serde_helpers::humantime")] + pub default_roundtrip: Duration, +} + +impl Default for PublicOverlayClientConfig { + fn default() -> Self { + Self { + neighbours_update_interval: Duration::from_secs(2 * 60), + neighbours_ping_interval: Duration::from_secs(30), + max_neighbours: 5, + max_ping_tasks: 5, + default_roundtrip: Duration::from_millis(300), + } + } +} diff --git a/core/src/overlay_client/mod.rs b/core/src/overlay_client/mod.rs index 613b1e980..6f43db315 100644 --- a/core/src/overlay_client/mod.rs +++ b/core/src/overlay_client/mod.rs @@ -1,35 +1,306 @@ -use crate::overlay_client::public_overlay_client::PublicOverlayClient; -use std::time::Duration; +use std::sync::Arc; +use std::time::{Duration, Instant}; -pub mod neighbour; -pub mod neighbours; -pub mod public_overlay_client; -pub mod settings; +use anyhow::Result; +use bytes::Bytes; +use tokio::task::AbortHandle; +use tycho_network::{Network, PublicOverlay, Request}; -async fn start_neighbours_ping(client: PublicOverlayClient) { - let mut interval = - tokio::time::interval(Duration::from_millis(client.neighbour_update_interval_ms())); +pub use self::config::PublicOverlayClientConfig; +pub use self::neighbour::{Neighbour, NeighbourStats}; +pub use self::neighbours::Neighbours; - loop { - interval.tick().await; - if let Err(e) = client.ping_random_neighbour().await { - tracing::error!("Failed to ping random neighbour. Error: {e:?}"); +use crate::proto::overlay; + +mod config; +mod neighbour; +mod neighbours; + +#[derive(Clone)] +#[repr(transparent)] +pub struct PublicOverlayClient { + inner: Arc, +} + +impl PublicOverlayClient { + pub fn new( + network: Network, + overlay: PublicOverlay, + config: PublicOverlayClientConfig, + ) -> Self { + let ttl = overlay.entry_ttl_sec(); + + let entries = overlay + .read_entries() + .choose_multiple(&mut rand::thread_rng(), config.max_neighbours) + .map(|entry_data| { + Neighbour::new( + entry_data.entry.peer_id, + entry_data.expires_at(ttl), + &config.default_roundtrip, + ) + }) + .collect::>(); + + let neighbours = Neighbours::new(entries, config.max_neighbours); + + let mut res = Inner { + network, + overlay, + neighbours, + config, + ping_task: None, + update_task: None, + cleanup_task: None, + }; + + // NOTE: Reuse same `Inner` type to avoid introducing a new type for shard state + // NOTE: Clone does not clone the tasks + res.ping_task = Some(tokio::spawn(res.clone().ping_neighbours_task()).abort_handle()); + res.update_task = Some(tokio::spawn(res.clone().update_neighbours_task()).abort_handle()); + res.cleanup_task = Some(tokio::spawn(res.clone().cleanup_neighbours_task()).abort_handle()); + + Self { + inner: Arc::new(res), + } + } + + pub fn config(&self) -> &PublicOverlayClientConfig { + &self.inner.config + } + + pub fn neighbours(&self) -> &Neighbours { + &self.inner.neighbours + } + + pub fn overlay(&self) -> &PublicOverlay { + &self.inner.overlay + } + + pub fn network(&self) -> &Network { + &self.inner.network + } + + pub async fn send(&self, data: R) -> Result<(), Error> + where + R: tl_proto::TlWrite, + { + self.inner.send(data).await + } + + pub async fn query(&self, data: R) -> Result, Error> + where + R: tl_proto::TlWrite, + for<'a> A: tl_proto::TlRead<'a, Repr = tl_proto::Boxed>, + { + self.inner.query(data).await + } +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("no active neighbours found")] + NoNeighbours, + #[error("network error: {0}")] + NetworkError(#[source] anyhow::Error), + #[error("invalid response: {0}")] + InvalidResponse(#[source] tl_proto::TlError), + #[error("request failed with code: {0}")] + RequestFailed(u32), +} + +struct Inner { + network: Network, + overlay: PublicOverlay, + neighbours: Neighbours, + config: PublicOverlayClientConfig, + + ping_task: Option, + update_task: Option, + cleanup_task: Option, +} + +impl Clone for Inner { + fn clone(&self) -> Self { + Self { + network: self.network.clone(), + overlay: self.overlay.clone(), + neighbours: self.neighbours.clone(), + config: self.config.clone(), + ping_task: None, + update_task: None, + cleanup_task: None, + } + } +} + +impl Inner { + async fn ping_neighbours_task(self) { + let mut interval = tokio::time::interval(self.config.neighbours_ping_interval); + loop { + interval.tick().await; + + if let Err(e) = self.query::<_, overlay::Pong>(overlay::Ping).await { + tracing::error!("failed to ping random neighbour: {e}"); + } + } + } + + async fn update_neighbours_task(self) { + let ttl = self.overlay.entry_ttl_sec(); + let max_neighbours = self.config.max_neighbours; + let default_roundtrip = self.config.default_roundtrip; + + let mut interval = tokio::time::interval(self.config.neighbours_update_interval); + + loop { + interval.tick().await; + + let active_neighbours = self.neighbours.get_active_neighbours().await.len(); + let neighbours_to_get = max_neighbours + (max_neighbours - active_neighbours); + + let neighbours = { + self.overlay + .read_entries() + .choose_multiple(&mut rand::thread_rng(), neighbours_to_get) + .map(|x| Neighbour::new(x.entry.peer_id, x.expires_at(ttl), &default_roundtrip)) + .collect::>() + }; + self.neighbours.update(neighbours).await; + } + } + + async fn cleanup_neighbours_task(self) { + loop { + self.overlay.entries_removed().notified().await; + self.neighbours.remove_outdated_neighbours().await; + } + } + + async fn send(&self, data: R) -> Result<(), Error> + where + R: tl_proto::TlWrite, + { + let Some(neighbour) = self.neighbours.choose().await else { + return Err(Error::NoNeighbours); + }; + + self.send_impl(neighbour, Request::from_tl(data)).await + } + + async fn query(&self, data: R) -> Result, Error> + where + R: tl_proto::TlWrite, + for<'a> A: tl_proto::TlRead<'a, Repr = tl_proto::Boxed>, + { + let Some(neighbour) = self.neighbours.choose().await else { + return Err(Error::NoNeighbours); + }; + + let res = self.query_impl(neighbour, Request::from_tl(data)).await?; + + let response = match tl_proto::deserialize::>(&res.data) { + Ok(r) => r, + Err(e) => { + res.reject(); + return Err(Error::InvalidResponse(e)); + } + }; + + match response { + overlay::Response::Ok(data) => Ok(QueryResponse { + data, + roundtrip_ms: res.roundtrip_ms, + neighbour: res.neighbour, + }), + overlay::Response::Err(code) => { + res.reject(); + Err(Error::RequestFailed(code)) + } + } + } + + async fn send_impl(&self, neighbour: Neighbour, req: Request) -> Result<(), Error> { + let started_at = Instant::now(); + + let res = self + .overlay + .send(&self.network, neighbour.peer_id(), req) + .await; + + let roundtrip = started_at.elapsed() * 2; // Multiply by 2 to estimate the roundtrip time + neighbour.track_request(&roundtrip, res.is_ok()); + + res.map_err(Error::NetworkError) + } + + async fn query_impl( + &self, + neighbour: Neighbour, + req: Request, + ) -> Result, Error> { + let started_at = Instant::now(); + + let res = self + .overlay + .query(&self.network, neighbour.peer_id(), req) + .await; + + let roundtrip = started_at.elapsed(); + + match res { + Ok(response) => Ok(QueryResponse { + data: response.body, + roundtrip_ms: roundtrip.as_millis() as u64, + neighbour, + }), + Err(e) => { + neighbour.track_request(&roundtrip, false); + Err(Error::NetworkError(e)) + } } } } -async fn start_neighbours_update(client: PublicOverlayClient) { - let mut interval = - tokio::time::interval(Duration::from_millis(client.neighbour_update_interval_ms())); - loop { - interval.tick().await; - client.update_neighbours().await; +impl Drop for Inner { + fn drop(&mut self) { + if let Some(handle) = self.ping_task.take() { + handle.abort(); + } + + if let Some(handle) = self.update_task.take() { + handle.abort(); + } + + if let Some(handle) = self.cleanup_task.take() { + handle.abort(); + } } } -async fn wait_update_neighbours(client: PublicOverlayClient) { - loop { - client.entries_removed().await; - client.remove_outdated_neighbours().await; +pub struct QueryResponse { + data: A, + neighbour: Neighbour, + roundtrip_ms: u64, +} + +impl QueryResponse { + pub fn data(&self) -> &A { + &self.data + } + + pub fn accept(self) -> (Neighbour, A) { + self.track_request(true); + (self.neighbour, self.data) + } + + pub fn reject(self) -> (Neighbour, A) { + self.track_request(false); + (self.neighbour, self.data) + } + + fn track_request(&self, success: bool) { + self.neighbour + .track_request(&Duration::from_millis(self.roundtrip_ms), success); } } diff --git a/core/src/overlay_client/neighbour.rs b/core/src/overlay_client/neighbour.rs index 681ad6224..c9d9c4fc3 100644 --- a/core/src/overlay_client/neighbour.rs +++ b/core/src/overlay_client/neighbour.rs @@ -1,66 +1,67 @@ use std::sync::Arc; +use std::time::Duration; +use parking_lot::RwLock; use tycho_network::PeerId; use tycho_util::time::now_sec; -#[derive(Debug, Copy, Clone)] -pub struct NeighbourOptions { - pub default_roundtrip_ms: u64, -} - #[derive(Clone)] -pub struct Neighbour(Arc); +#[repr(transparent)] +pub struct Neighbour { + inner: Arc, +} impl Neighbour { - pub fn new(peer_id: PeerId, expires_at: u32, options: NeighbourOptions) -> Self { - let default_roundtrip_ms = truncate_time(options.default_roundtrip_ms); - let stats = parking_lot::RwLock::new(TrackedStats::new(default_roundtrip_ms)); - - let state = Arc::new(NeighbourState { - peer_id, - expires_at, - stats, - }); - Self(state) + pub fn new(peer_id: PeerId, expires_at: u32, default_roundtrip: &Duration) -> Self { + Self { + inner: Arc::new(Inner { + peer_id, + expires_at, + stats: RwLock::new(TrackedStats::new(truncate_time(default_roundtrip))), + }), + } } #[inline] pub fn peer_id(&self) -> &PeerId { - &self.0.peer_id + &self.inner.peer_id } #[inline] pub fn expires_at_secs(&self) -> u32 { - self.0.expires_at + self.inner.expires_at } pub fn get_stats(&self) -> NeighbourStats { - let stats = self.0.stats.read(); + let stats = self.inner.stats.read(); NeighbourStats { score: stats.score, total_requests: stats.total, failed_requests: stats.failed, - avg_roundtrip: stats.roundtrip.get_avg(), + avg_roundtrip: stats + .roundtrip + .get_avg() + .map(|avg| Duration::from_millis(avg as u64)), created: stats.created, } } pub fn is_reliable(&self) -> bool { - self.0.stats.read().higher_than_threshold() + self.inner.stats.read().higher_than_threshold() } pub fn compute_selection_score(&self) -> Option { - self.0.stats.read().compute_selection_score() + self.inner.stats.read().compute_selection_score() } - pub fn get_roundtrip(&self) -> Option { - let roundtrip = self.0.stats.read().roundtrip.get_avg()?; - Some(roundtrip as u64) + pub fn get_roundtrip(&self) -> Option { + let roundtrip = self.inner.stats.read().roundtrip.get_avg()?; + Some(Duration::from_millis(roundtrip as u64)) } - pub fn track_request(&self, roundtrip: u64, success: bool) { + pub fn track_request(&self, roundtrip: &Duration, success: bool) { let roundtrip = truncate_time(roundtrip); - self.0.stats.write().update(roundtrip, success); + self.inner.stats.write().update(roundtrip, success); } } @@ -73,14 +74,14 @@ pub struct NeighbourStats { pub total_requests: u64, /// The number of failed requests to the neighbour. pub failed_requests: u64, - /// Average ADNL roundtrip in milliseconds. - /// NONE if there were no ADNL requests to the neighbour. - pub avg_roundtrip: Option, + /// Average roundtrip. + /// NONE if there were no requests to the neighbour. + pub avg_roundtrip: Option, /// Neighbour first appearance pub created: u32, } -struct NeighbourState { +struct Inner { peer_id: PeerId, expires_at: u32, stats: parking_lot::RwLock, @@ -193,6 +194,6 @@ impl PackedSmaBuffer { } } -fn truncate_time(roundtrip: u64) -> u16 { - std::cmp::min(roundtrip, u16::MAX as u64) as u16 +fn truncate_time(roundtrip: &Duration) -> u16 { + std::cmp::min(roundtrip.as_millis() as u64, u16::MAX as u64) as u16 } diff --git a/core/src/overlay_client/neighbours.rs b/core/src/overlay_client/neighbours.rs index f3fabf35a..7aa5eed55 100644 --- a/core/src/overlay_client/neighbours.rs +++ b/core/src/overlay_client/neighbours.rs @@ -5,7 +5,6 @@ use rand::Rng; use tokio::sync::Mutex; use crate::overlay_client::neighbour::Neighbour; -use crate::overlay_client::settings::NeighboursOptions; #[derive(Clone)] #[repr(transparent)] @@ -14,23 +13,19 @@ pub struct Neighbours { } impl Neighbours { - pub fn new(entries: Vec, options: NeighboursOptions) -> Self { - let mut selection_index = SelectionIndex::new(options.max_neighbours); + pub fn new(entries: Vec, max_neighbours: usize) -> Self { + let mut selection_index = SelectionIndex::new(max_neighbours); selection_index.update(&entries); Self { inner: Arc::new(Inner { - options, + max_neighbours, entries: Mutex::new(entries), selection_index: Mutex::new(selection_index), }), } } - pub fn options(&self) -> &NeighboursOptions { - &self.inner.options - } - pub async fn choose(&self) -> Option { let selection_index = self.inner.selection_index.lock().await; selection_index.get(&mut rand::thread_rng()) @@ -63,7 +58,7 @@ impl Neighbours { guard.retain(|x| x.is_reliable() && x.expires_at_secs() > now); // if all neighbours are reliable and valid then remove the worst - if guard.len() >= self.inner.options.max_neighbours { + if guard.len() >= self.inner.max_neighbours { if let Some(worst) = guard .iter() .min_by(|l, r| l.get_stats().score.cmp(&r.get_stats().score)) @@ -78,7 +73,7 @@ impl Neighbours { if guard.iter().any(|x| x.peer_id() == n.peer_id()) { continue; } - if guard.len() < self.inner.options.max_neighbours { + if guard.len() < self.inner.max_neighbours { guard.push(n); } } @@ -99,7 +94,7 @@ impl Neighbours { } struct Inner { - options: NeighboursOptions, + max_neighbours: usize, entries: Mutex>, selection_index: Mutex, } diff --git a/core/src/overlay_client/public_overlay_client.rs b/core/src/overlay_client/public_overlay_client.rs deleted file mode 100644 index 9e48b78ca..000000000 --- a/core/src/overlay_client/public_overlay_client.rs +++ /dev/null @@ -1,250 +0,0 @@ -use std::sync::Arc; -use std::time::{Duration, Instant}; - -use anyhow::{Error, Result}; -use tycho_network::Network; -use tycho_network::{PublicOverlay, Request}; - -use crate::overlay_client::neighbour::{Neighbour, NeighbourOptions}; -use crate::overlay_client::neighbours::Neighbours; -use crate::overlay_client::settings::{OverlayClientSettings, OverlayOptions}; -use crate::proto::overlay::{Ping, Pong, Response}; - -#[derive(Clone)] -#[repr(transparent)] -pub struct PublicOverlayClient(Arc); - -impl PublicOverlayClient { - pub fn new(network: Network, overlay: PublicOverlay, settings: OverlayClientSettings) -> Self { - let neighbour_options = NeighbourOptions { - default_roundtrip_ms: settings.neighbours_options.default_roundtrip_ms, - }; - - let ttl = overlay.entry_ttl_sec(); - let entries = { - overlay - .read_entries() - .choose_multiple( - &mut rand::thread_rng(), - settings.neighbours_options.max_neighbours, - ) - .map(|entry_data| { - Neighbour::new( - entry_data.entry.peer_id, - entry_data.expires_at(ttl), - neighbour_options, - ) - }) - .collect::>() - }; - - let neighbours = Neighbours::new(entries, settings.neighbours_options); - - let inner = Arc::new(OverlayClientState { - network, - overlay, - neighbours, - settings: settings.overlay_options, - }); - - Self(inner) - } - - pub fn neighbours(&self) -> &Neighbours { - &self.0.neighbours - } - - pub async fn send(&self, data: R) -> Result<()> - where - R: tl_proto::TlWrite, - { - let Some(neighbour) = self.0.neighbours.choose().await else { - tracing::error!("No neighbours found to send request"); - return Err(Error::msg("Failed to ping")); //TODO: proper error - }; - - self.0 - .overlay - .send(&self.0.network, neighbour.peer_id(), Request::from_tl(data)) - .await?; - Ok(()) - } - - pub async fn query(&self, data: R) -> Result> - where - R: tl_proto::TlWrite, - for<'a> A: tl_proto::TlRead<'a, Repr = tl_proto::Boxed>, - { - let Some(neighbour) = self.0.neighbours.choose().await else { - tracing::error!("No neighbours found to send request"); - return Err(Error::msg("Failed to ping")); //TODO: proper error - }; - - let start_time = Instant::now(); - let response_opt = self - .0 - .overlay - .query(&self.0.network, neighbour.peer_id(), Request::from_tl(data)) - .await; - let roundtrip = start_time.elapsed(); - - match response_opt { - Ok(response) => { - let response = response.parse_tl::>()?; - let response_model = match response { - Response::Ok(r) => r, - Response::Err(code) => { - return Err(Error::msg(format!("Failed to get response: {code}"))) - } - }; - - Ok(QueryResponse { - data: response_model, - roundtrip: roundtrip.as_millis() as u64, - neighbour: neighbour.clone(), - }) - } - Err(e) => { - tracing::error!(peer_id = %neighbour.peer_id(), "Failed to get response from peer. Err: {e:?}"); - Err(e) - } - } - } - - pub async fn wait_entries_removed(&self) { - self.0.overlay.entries_removed().notified().await; - } - - pub fn neighbour_update_interval_ms(&self) -> u64 { - self.0.settings.neighbours_update_interval - } - - pub fn neighbour_ping_interval_ms(&self) -> u64 { - self.0.settings.neighbours_ping_interval - } - - pub async fn update_neighbours(&self) { - let active_neighbours = self.neighbours().get_active_neighbours().await.len(); - let max_neighbours = self.neighbours().options().max_neighbours; - - let neighbours_to_get = max_neighbours + (max_neighbours - active_neighbours); - let neighbour_options = self.neighbours().options().clone(); - - let neighbour_options = NeighbourOptions { - default_roundtrip_ms: neighbour_options.default_roundtrip_ms, - }; - let neighbours = { - self.0 - .overlay - .read_entries() - .choose_multiple(&mut rand::thread_rng(), neighbours_to_get) - .map(|x| { - Neighbour::new( - x.entry.peer_id, - x.expires_at(self.0.overlay.entry_ttl_sec()), - neighbour_options, - ) - }) - .collect::>() - }; - self.neighbours().update(neighbours).await; - } - - pub async fn remove_outdated_neighbours(&self) { - self.neighbours().remove_outdated_neighbours().await; - } - - pub async fn ping_random_neighbour(&self) -> Result<()> { - let Some(neighbour) = self.0.neighbours.choose().await else { - tracing::error!("No neighbours found to ping"); - return Err(Error::msg("Failed to ping")); - }; - tracing::info!( - peer_id = %neighbour.peer_id(), - stats = ?neighbour.get_stats(), - "Selected neighbour to ping", - ); - - let start_time = Instant::now(); - - let pong_res = self - .0 - .overlay - .query(&self.0.network, neighbour.peer_id(), Request::from_tl(Ping)) - .await; - - let end_time = Instant::now(); - - let success = match pong_res { - Ok(response) => { - response.parse_tl::()?; - tracing::info!(peer_id = %neighbour.peer_id(), "Pong received", ); - true - } - Err(e) => { - tracing::error!(peer_id = %neighbour.peer_id(), "Failed to received pong. Error: {e:?}"); - false - } - }; - - neighbour.track_request( - end_time.duration_since(start_time).as_millis() as u64, - success, - ); - self.neighbours().update_selection_index().await; - - Ok(()) - } -} - -async fn start_neighbours_ping(client: PublicOverlayClient) { - let mut interval = - tokio::time::interval(Duration::from_millis(client.neighbour_update_interval_ms())); - - loop { - interval.tick().await; - if let Err(e) = client.ping_random_neighbour().await { - tracing::error!("Failed to ping random neighbour. Error: {e:?}"); - } - } -} - -async fn start_neighbours_update(client: PublicOverlayClient) { - let mut interval = - tokio::time::interval(Duration::from_millis(client.neighbour_update_interval_ms())); - loop { - interval.tick().await; - client.update_neighbours().await; - } -} - -async fn wait_update_neighbours(client: PublicOverlayClient) { - loop { - client.wait_entries_removed().await; - client.remove_outdated_neighbours().await; - } -} - -struct OverlayClientState { - network: Network, - overlay: PublicOverlay, - neighbours: Neighbours, - - settings: OverlayOptions, -} - -pub struct QueryResponse { - data: A, - neighbour: Neighbour, - roundtrip: u64, -} - -impl QueryResponse { - pub fn data(&self) -> &A { - &self.data - } - - pub fn mark_response(&self, success: bool) { - self.neighbour.track_request(self.roundtrip, success); - } -} diff --git a/core/src/overlay_client/settings.rs b/core/src/overlay_client/settings.rs deleted file mode 100644 index d509e4876..000000000 --- a/core/src/overlay_client/settings.rs +++ /dev/null @@ -1,39 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -pub struct OverlayClientSettings { - pub overlay_options: OverlayOptions, - pub neighbours_options: NeighboursOptions, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct OverlayOptions { - pub neighbours_update_interval: u64, - pub neighbours_ping_interval: u64, -} - -impl Default for OverlayOptions { - fn default() -> Self { - Self { - neighbours_update_interval: 60 * 2 * 1000, - neighbours_ping_interval: 2000, - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NeighboursOptions { - pub max_neighbours: usize, - pub max_ping_tasks: usize, - pub default_roundtrip_ms: u64, -} - -impl Default for NeighboursOptions { - fn default() -> Self { - Self { - max_neighbours: 5, - max_ping_tasks: 6, - default_roundtrip_ms: 2000, - } - } -} diff --git a/core/tests/block_strider.rs b/core/tests/block_strider.rs index bfe176361..13b71fb3e 100644 --- a/core/tests/block_strider.rs +++ b/core/tests/block_strider.rs @@ -5,8 +5,7 @@ use futures_util::stream::FuturesUnordered; use futures_util::StreamExt; use tycho_core::block_strider::provider::BlockProvider; use tycho_core::blockchain_client::BlockchainClient; -use tycho_core::overlay_client::public_overlay_client::PublicOverlayClient; -use tycho_core::overlay_client::settings::OverlayClientSettings; +use tycho_core::overlay_client::{PublicOverlayClient, PublicOverlayClientConfig}; use tycho_network::PeerId; mod common; @@ -115,9 +114,8 @@ async fn overlay_block_strider() -> anyhow::Result<()> { PublicOverlayClient::new( node.network().clone(), node.public_overlay().clone(), - OverlayClientSettings::default(), - ) - .await, + PublicOverlayClientConfig::default(), + ), Default::default(), ); diff --git a/core/tests/overlay_client.rs b/core/tests/overlay_client.rs index 20f81a107..9d3f201fa 100644 --- a/core/tests/overlay_client.rs +++ b/core/tests/overlay_client.rs @@ -1,10 +1,9 @@ +use std::time::Duration; + use rand::distributions::{Distribution, WeightedIndex}; use rand::thread_rng; use tl_proto::{TlRead, TlWrite}; -use tycho_core::overlay_client::neighbour::{Neighbour, NeighbourOptions}; -use tycho_core::overlay_client::neighbours::Neighbours; -use tycho_core::overlay_client::public_overlay_client::Peer; -use tycho_core::overlay_client::settings::NeighboursOptions; +use tycho_core::overlay_client::{Neighbour, Neighbours}; use tycho_network::PeerId; #[derive(TlWrite, TlRead)] @@ -13,7 +12,9 @@ struct TestResponse; #[tokio::test] pub async fn test() { - let options = NeighboursOptions::default(); + let max_neighbours = 5; + let default_roundtrip = Duration::from_millis(300); + let initial_peers = vec![ PeerId([0u8; 32]), PeerId([1u8; 32]), @@ -21,15 +22,13 @@ pub async fn test() { PeerId([3u8; 32]), PeerId([4u8; 32]), ] - .iter() - .map(|x| Peer { - id: *x, - expires_at: u32::MAX, - }) + .into_iter() + .map(|peer_id| Neighbour::new(peer_id, u32::MAX, &default_roundtrip)) .collect::>(); + println!("{}", initial_peers.len()); - let neighbours = Neighbours::new(initial_peers.clone(), options.clone()).await; + let neighbours = Neighbours::new(initial_peers.clone(), max_neighbours); println!("{}", neighbours.get_active_neighbours().await.len()); let first_success_rate = [0.2, 0.8]; @@ -55,14 +54,17 @@ pub async fn test() { //let end = Instant::now(); if let Some(n) = n_opt { - let index = slice.iter().position(|r| r.id == n.peer_id()).unwrap(); + let index = slice + .iter() + .position(|r| r.peer_id() == n.peer_id()) + .unwrap(); let answer = indices[index].sample(&mut rng); if answer == 0 { println!("Success request to peer: {}", n.peer_id()); - n.track_request(200, true) + n.track_request(&Duration::from_millis(200), true) } else { println!("Failed request to peer: {}", n.peer_id()); - n.track_request(200, false) + n.track_request(&Duration::from_millis(200), false) } neighbours.update_selection_index().await; @@ -70,33 +72,16 @@ pub async fn test() { i = i + 1; } - let new_peers = vec![ + let new_neighbours = vec![ PeerId([5u8; 32]), PeerId([6u8; 32]), PeerId([7u8; 32]), PeerId([8u8; 32]), PeerId([9u8; 32]), ] - .iter() - .map(|x| Peer { - id: *x, - expires_at: u32::MAX, - }) + .into_iter() + .map(|peer_id| Neighbour::new(peer_id, u32::MAX, &default_roundtrip)) .collect::>(); - - let new_neighbours = new_peers - .iter() - .map(|x| { - Neighbour::new( - x.id, - x.expires_at, - NeighbourOptions { - default_roundtrip_ms: options.default_roundtrip_ms, - }, - ) - }) - .collect::>(); - neighbours.update(new_neighbours).await; let active = neighbours.get_active_neighbours().await; diff --git a/core/tests/overlay_server.rs b/core/tests/overlay_server.rs index a3223b813..0ecca5fa2 100644 --- a/core/tests/overlay_server.rs +++ b/core/tests/overlay_server.rs @@ -6,8 +6,7 @@ use everscale_types::models::BlockId; use futures_util::stream::FuturesUnordered; use futures_util::StreamExt; use tycho_core::blockchain_client::BlockchainClient; -use tycho_core::overlay_client::public_overlay_client::PublicOverlayClient; -use tycho_core::overlay_client::settings::OverlayClientSettings; +use tycho_core::overlay_client::PublicOverlayClient; use tycho_core::overlay_server::DEFAULT_ERROR_CODE; use tycho_core::proto::overlay::{BlockFull, KeyBlockIds, PersistentStatePart}; use tycho_network::PeerId; @@ -94,27 +93,26 @@ async fn overlay_server_with_empty_storage() -> Result<()> { PublicOverlayClient::new( node.network().clone(), node.public_overlay().clone(), - OverlayClientSettings::default(), - ) - .await, + Default::default(), + ), Default::default(), ); - let result = client.get_block_full(BlockId::default()).await; + let result = client.get_block_full(&BlockId::default()).await; assert!(result.is_ok()); if let Ok(response) = &result { assert_eq!(response.data(), &BlockFull::Empty); } - let result = client.get_next_block_full(BlockId::default()).await; + let result = client.get_next_block_full(&BlockId::default()).await; assert!(result.is_ok()); if let Ok(response) = &result { assert_eq!(response.data(), &BlockFull::Empty); } - let result = client.get_next_key_block_ids(BlockId::default(), 10).await; + let result = client.get_next_key_block_ids(&BlockId::default(), 10).await; assert!(result.is_ok()); if let Ok(response) = &result { @@ -126,7 +124,7 @@ async fn overlay_server_with_empty_storage() -> Result<()> { } let result = client - .get_persistent_state_part(BlockId::default(), BlockId::default(), 0, 0) + .get_persistent_state_part(&BlockId::default(), &BlockId::default(), 0, 0) .await; assert!(result.is_ok()); @@ -238,16 +236,15 @@ async fn overlay_server_blocks() -> Result<()> { PublicOverlayClient::new( node.network().clone(), node.public_overlay().clone(), - OverlayClientSettings::default(), - ) - .await, + Default::default(), + ), Default::default(), ); let archive = common::storage::get_archive()?; for (block_id, archive_data) in archive.blocks { if block_id.shard.is_masterchain() { - let result = client.get_block_full(block_id.clone()).await; + let result = client.get_block_full(&block_id).await; assert!(result.is_ok()); if let Ok(response) = &result {