Skip to content

Commit

Permalink
refactor(overlay-client): simplify overlay client
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Apr 26, 2024
1 parent ec6a5e4 commit 568b295
Show file tree
Hide file tree
Showing 11 changed files with 495 additions and 475 deletions.
16 changes: 8 additions & 8 deletions core/src/block_strider/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,25 @@ impl BlockProvider for BlockchainClient {
let config = self.config();

loop {
let res = self.get_next_block_full(*prev_block_id).await;
let res = self.get_next_block_full(prev_block_id).await;

let block = match res {
Ok(res) if matches!(res.data(), BlockFull::Found { .. }) => {
let (block_id, data) = match res.data() {
BlockFull::Found {
block_id, block, ..
} => (*block_id, block),
} => (*block_id, block.clone()),
BlockFull::Empty => unreachable!(),
};

match BlockStuff::deserialize_checked(block_id, data) {
match BlockStuff::deserialize_checked(block_id, &data) {
Ok(block) => {
res.mark_response(true);
Some(Ok(BlockStuffAug::new(block, data.clone())))
res.accept();
Some(Ok(BlockStuffAug::new(block, data)))
}
Err(e) => {
tracing::error!("failed to deserialize block: {:?}", e);
res.mark_response(false);
res.reject();
None
}
}
Expand All @@ -135,7 +135,7 @@ impl BlockProvider for BlockchainClient {
let config = self.config();

loop {
let res = match self.get_block_full(*block_id).await {
let res = match self.get_block_full(block_id).await {
Ok(res) => res,
Err(e) => {
tracing::error!("failed to get block: {:?}", e);
Expand All @@ -152,7 +152,7 @@ impl BlockProvider for BlockchainClient {
} => match BlockStuff::deserialize_checked(*block_id, data) {
Ok(block) => Some(Ok(BlockStuffAug::new(block, data.clone()))),
Err(e) => {
res.mark_response(false);
res.accept();
tracing::error!("failed to deserialize block: {:?}", e);
tokio::time::sleep(config.get_block_polling_interval).await;
continue;
Expand Down
131 changes: 72 additions & 59 deletions core/src/blockchain_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,63 +4,94 @@ use std::time::Duration;
use anyhow::Result;
use everscale_types::models::BlockId;

use crate::overlay_client::public_overlay_client::*;
use crate::overlay_client::{PublicOverlayClient, QueryResponse};
use crate::proto::overlay::rpc::*;
use crate::proto::overlay::*;

pub struct BlockchainClientConfig {
pub get_next_block_polling_interval: Duration,
pub get_block_polling_interval: Duration,
}

impl Default for BlockchainClientConfig {
fn default() -> Self {
Self {
get_block_polling_interval: Duration::from_millis(50),
get_next_block_polling_interval: Duration::from_millis(50),
}
}
}

#[derive(Clone)]
#[repr(transparent)]
pub struct BlockchainClient {
client: PublicOverlayClient,
inner: Arc<Inner>,
}

struct Inner {
overlay_client: PublicOverlayClient,
config: BlockchainClientConfig,
}

impl BlockchainClient {
pub fn new(
overlay_client: PublicOverlayClient,
config: BlockchainClientConfig,
) -> Arc<BlockchainClient> {
Arc::new(Self {
client: overlay_client,
config,
})
pub fn new(overlay_client: PublicOverlayClient, config: BlockchainClientConfig) -> Self {
Self {
inner: Arc::new(Inner {
overlay_client,
config,
}),
}
}

pub fn overlay_client(&self) -> &PublicOverlayClient {
&self.inner.overlay_client
}

pub fn config(&self) -> &BlockchainClientConfig {
&self.inner.config
}

pub async fn get_next_key_block_ids(
&self,
block: BlockId,
block: &BlockId,
max_size: u32,
) -> Result<QueryResponse<'_, KeyBlockIds>> {
let data = self
.client
.query::<GetNextKeyBlockIds, KeyBlockIds>(GetNextKeyBlockIds { block, max_size })
) -> Result<QueryResponse<KeyBlockIds>> {
let client = &self.inner.overlay_client;
let data = client
.query::<_, KeyBlockIds>(&GetNextKeyBlockIds {
block: *block,
max_size,
})
.await?;
Ok(data)
}

pub async fn get_block_full(&self, block: BlockId) -> Result<QueryResponse<'_, BlockFull>> {
let data = self
.client
.query::<GetBlockFull, BlockFull>(GetBlockFull { block })
pub async fn get_block_full(&self, block: &BlockId) -> Result<QueryResponse<BlockFull>> {
let client = &self.inner.overlay_client;
let data = client
.query::<_, BlockFull>(GetBlockFull { block: *block })
.await?;
Ok(data)
}

pub async fn get_next_block_full(
&self,
prev_block: BlockId,
) -> Result<QueryResponse<'_, BlockFull>> {
let data = self
.client
.query::<GetNextBlockFull, BlockFull>(GetNextBlockFull { prev_block })
prev_block: &BlockId,
) -> Result<QueryResponse<BlockFull>> {
let client = &self.inner.overlay_client;
let data = client
.query::<_, BlockFull>(GetNextBlockFull {
prev_block: *prev_block,
})
.await?;
Ok(data)
}

pub async fn get_archive_info(&self, mc_seqno: u32) -> Result<QueryResponse<'_, ArchiveInfo>> {
let data = self
.client
.query::<GetArchiveInfo, ArchiveInfo>(GetArchiveInfo { mc_seqno })
pub async fn get_archive_info(&self, mc_seqno: u32) -> Result<QueryResponse<ArchiveInfo>> {
let client = &self.inner.overlay_client;
let data = client
.query::<_, ArchiveInfo>(GetArchiveInfo { mc_seqno })
.await?;

Ok(data)
}

Expand All @@ -69,10 +100,10 @@ impl BlockchainClient {
archive_id: u64,
offset: u64,
max_size: u32,
) -> Result<QueryResponse<'_, Data>> {
let data = self
.client
.query::<GetArchiveSlice, Data>(GetArchiveSlice {
) -> Result<QueryResponse<Data>> {
let client = &self.inner.overlay_client;
let data = client
.query::<_, Data>(GetArchiveSlice {
archive_id,
offset,
max_size,
Expand All @@ -83,38 +114,20 @@ impl BlockchainClient {

pub async fn get_persistent_state_part(
&self,
mc_block: BlockId,
block: BlockId,
mc_block: &BlockId,
block: &BlockId,
offset: u64,
max_size: u64,
) -> Result<QueryResponse<'_, PersistentStatePart>> {
let data = self
.client
.query::<GetPersistentStatePart, PersistentStatePart>(GetPersistentStatePart {
block,
mc_block,
) -> Result<QueryResponse<PersistentStatePart>> {
let client = &self.inner.overlay_client;
let data = client
.query::<_, PersistentStatePart>(GetPersistentStatePart {
block: *block,
mc_block: *mc_block,
offset,
max_size,
})
.await?;
Ok(data)
}

pub fn config(&self) -> &BlockchainClientConfig {
&self.config
}
}

pub struct BlockchainClientConfig {
pub get_next_block_polling_interval: Duration,
pub get_block_polling_interval: Duration,
}

impl Default for BlockchainClientConfig {
fn default() -> Self {
Self {
get_block_polling_interval: Duration::from_millis(50),
get_next_block_polling_interval: Duration::from_millis(50),
}
}
}
49 changes: 49 additions & 0 deletions core/src/overlay_client/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::time::Duration;

use serde::{Deserialize, Serialize};
use tycho_util::serde_helpers;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
#[non_exhaustive]
pub struct PublicOverlayClientConfig {
/// The interval at which neighbours list is updated.
///
/// Default: 2 minutes.
#[serde(with = "serde_helpers::humantime")]
pub neighbours_update_interval: Duration,

/// The interval at which current neighbours are pinged.
///
/// Default: 30 seconds.
#[serde(with = "serde_helpers::humantime")]
pub neighbours_ping_interval: Duration,

/// The maximum number of neighbours to keep.
///
/// Default: 5.
pub max_neighbours: usize,

/// The maximum number of ping tasks to run concurrently.
///
/// Default: 5.
pub max_ping_tasks: usize,

/// The default roundtrip time to use when a neighbour is added.
///
/// Default: 300 ms.
#[serde(with = "serde_helpers::humantime")]
pub default_roundtrip: Duration,
}

impl Default for PublicOverlayClientConfig {
fn default() -> Self {
Self {
neighbours_update_interval: Duration::from_secs(2 * 60),
neighbours_ping_interval: Duration::from_secs(30),
max_neighbours: 5,
max_ping_tasks: 5,
default_roundtrip: Duration::from_millis(300),
}
}
}
Loading

0 comments on commit 568b295

Please sign in to comment.