Skip to content

Commit

Permalink
feat(core): rework persistent state storage and downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Jun 3, 2024
1 parent 760f388 commit 68fac03
Show file tree
Hide file tree
Showing 16 changed files with 602 additions and 376 deletions.
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ license.workspace = true
anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true, features = ["serde"] }
bytesize = { workspace = true }
castaway = { workspace = true }
everscale-types = { workspace = true }
futures-util = { workspace = true }
Expand All @@ -33,7 +34,6 @@ tycho-storage = { workspace = true }
tycho-util = { workspace = true }

[dev-dependencies]
bytesize = { workspace = true }
everscale-crypto = { workspace = true }
tempfile = { workspace = true }
tracing-test = { workspace = true }
Expand Down
165 changes: 152 additions & 13 deletions core/src/blockchain_rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ use std::sync::Arc;
use anyhow::Result;
use everscale_types::models::BlockId;
use futures_util::stream::{FuturesUnordered, StreamExt};
use tycho_block_util::state::ShardStateStuff;
use tycho_network::{PublicOverlay, Request};
use tycho_storage::Storage;
use tycho_util::futures::JoinTask;

use crate::overlay_client::{Error, PublicOverlayClient, QueryResponse};
use crate::overlay_client::{Error, Neighbour, PublicOverlayClient, QueryResponse};
use crate::proto::blockchain::*;
use crate::proto::overlay::BroadcastPrefix;

Expand Down Expand Up @@ -127,36 +130,172 @@ impl BlockchainRpcClient {
pub async fn get_archive_slice(
&self,
archive_id: u64,
limit: u32,
offset: u64,
max_size: u32,
) -> Result<QueryResponse<Data>, Error> {
let client = &self.inner.overlay_client;
let data = client
.query::<_, Data>(&rpc::GetArchiveSlice {
archive_id,
limit,
offset,
max_size,
})
.await?;
Ok(data)
}

pub async fn get_persistent_state_info(
&self,
block_id: &BlockId,
) -> Result<QueryResponse<PersistentStateInfo>, Error> {
let client = &self.inner.overlay_client;
let data = client
.query::<_, PersistentStateInfo>(&rpc::GetPersistentStateInfo {
block_id: *block_id,
})
.await?;
Ok(data)
}

pub async fn get_persistent_state_part(
&self,
mc_block: &BlockId,
block: &BlockId,
limit: u64,
neighbour: &Neighbour,
block_id: &BlockId,
limit: u32,
offset: u64,
) -> Result<QueryResponse<PersistentStatePart>, Error> {
) -> Result<QueryResponse<Data>, Error> {
let client = &self.inner.overlay_client;
let data = client
.query::<_, PersistentStatePart>(&rpc::GetPersistentStatePart {
block_id: *block,
mc_block_id: *mc_block,
offset,
max_size: limit,
})
.query_raw::<Data>(
neighbour.clone(),
Request::from_tl(rpc::GetPersistentStatePart {
block_id: *block_id,
limit,
offset,
}),
)
.await?;
Ok(data)
}

pub async fn download_and_store_state(
&self,
block_id: &BlockId,
storage: Storage,
) -> Result<ShardStateStuff, Error> {
const PARALLEL_REQUESTS: usize = 10;
const CHUNK_SIZE: u32 = 2 << 20; // 2 MB
const MAX_STATE_SIZE: u64 = 10 << 30; // 10 GB

// TODO: Iterate through all known (or unknown) neighbours
const NEIGHBOUR_COUNT: usize = 10;
let neighbours = self
.overlay_client()
.neighbours()
.choose_multiple(NEIGHBOUR_COUNT)
.await;

// Find a neighbour which has the requested state
let (neighbour, max_size) = 'info: {
let req = Request::from_tl(rpc::GetPersistentStateInfo {
block_id: *block_id,
});

let mut futures = FuturesUnordered::new();
for neighbour in neighbours {
futures.push(self.overlay_client().query_raw(neighbour, req.clone()));
}

let mut err = None;
while let Some(info) = futures.next().await {
let (handle, info) = match info {
Ok(res) => res.split(),
Err(e) => {
err = Some(e);
continue;
}
};

match info {
PersistentStateInfo::Found { size } if size <= MAX_STATE_SIZE => {
break 'info (handle.accept(), size)
}
PersistentStateInfo::Found { size } => {
let neighbour = handle.reject();
tracing::warn!(
peer_id = %neighbour.peer_id(),
size,
"malicious neighbour has a too large state",
);
return Err(Error::Internal(anyhow::anyhow!("malicious neighbour")));
}
PersistentStateInfo::NotFound => continue,
}
}

return match err {
None => Err(Error::Internal(anyhow::anyhow!(
"no neighbour has the requested state"
))),
Some(err) => Err(err),
};
};

// Download the state
let chunk_count = (max_size + CHUNK_SIZE as u64 - 1) / CHUNK_SIZE as u64;
let mut stream =
futures_util::stream::iter((0..chunk_count).map(|i| i * CHUNK_SIZE as u64))
.map(|offset| {
let neighbour = neighbour.clone();
let req = Request::from_tl(rpc::GetPersistentStatePart {
block_id: *block_id,
limit: CHUNK_SIZE,
offset,
});

let client = self.overlay_client().clone();
JoinTask::new(async move {
// TODO: Retry on error
client.query_raw::<Data>(neighbour, req).await
})
})
.buffered(PARALLEL_REQUESTS);

let mut store_state_op = storage
.shard_state_storage()
.begin_store_state_raw(block_id)
.map(Box::new)
.map_err(Error::Internal)?;

// NOTE: Buffered items in stream will be polled because they are spawned as tasks
while let Some(response) = stream.next().await.transpose()? {
let (op, finished) = tokio::task::spawn_blocking(move || {
let (handle, part) = response.split();
match store_state_op.process_part(part.data) {
Ok(finished) => Ok((store_state_op, finished)),
Err(e) => {
handle.reject();
Err(e)
}
}
})
.await
.map_err(|e| Error::Internal(e.into()))?
.map_err(Error::Internal)?;

if !finished {
store_state_op = op;
continue;
}

return tokio::task::spawn_blocking(move || op.finalize())
.await
.map_err(|e| Error::Internal(e.into()))?
.map_err(Error::Internal);
}

Err(Error::Internal(anyhow::anyhow!(
"downloaded incomplete state"
)))
}
}
4 changes: 3 additions & 1 deletion core/src/blockchain_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ pub use self::service::{
mod client;
mod service;

pub const INTERNAL_ERROR_CODE: u32 = 1;
pub const BAD_REQUEST_ERROR_CODE: u32 = 1;
pub const INTERNAL_ERROR_CODE: u32 = 2;
pub const NOT_FOUND_ERROR_CODE: u32 = 3;
111 changes: 68 additions & 43 deletions core/src/blockchain_rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use std::sync::Arc;

use anyhow::Context;
use bytes::{Buf, Bytes};
use bytesize::ByteSize;
use futures_util::Future;
use serde::{Deserialize, Serialize};
use tycho_network::{InboundRequestMeta, Response, Service, ServiceRequest};
use tycho_storage::{BlockConnection, KeyBlocksDirection, Storage};
use tycho_util::futures::BoxFutureOrNoop;

use crate::blockchain_rpc::INTERNAL_ERROR_CODE;
use crate::blockchain_rpc::{BAD_REQUEST_ERROR_CODE, INTERNAL_ERROR_CODE, NOT_FOUND_ERROR_CODE};
use crate::proto::blockchain::*;
use crate::proto::overlay;

Expand Down Expand Up @@ -196,12 +197,20 @@ impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
Some(Response::from_tl(res))
})
},
rpc::GetPersistentStateInfo as req => {
tracing::debug!(block_id = %req.block_id, "getPersistentStateInfo");

let inner = self.inner.clone();
BoxFutureOrNoop::future(async move {
let res = inner.handle_get_persistent_state_info(&req);
Some(Response::from_tl(res))
})
},
rpc::GetPersistentStatePart as req => {
tracing::debug!(
block_id = %req.block_id,
mc_block_id = %req.mc_block_id,
limit = %req.limit,
offset = %req.offset,
max_size = %req.max_size,
"getPersistentStatePart"
);

Expand All @@ -223,8 +232,8 @@ impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
rpc::GetArchiveSlice as req => {
tracing::debug!(
archive_id = %req.archive_id,
limit = %req.limit,
offset = %req.offset,
max_size = %req.max_size,
"getArchiveSlice"
);

Expand Down Expand Up @@ -439,44 +448,6 @@ impl<B> Inner<B> {
}
}

async fn handle_get_persistent_state_part(
&self,
req: &rpc::GetPersistentStatePart,
) -> overlay::Response<PersistentStatePart> {
const PART_MAX_SIZE: u64 = 1 << 21;

let persistent_state_storage = self.storage().persistent_state_storage();

let persistent_state_request_validation = || {
anyhow::ensure!(
self.config.serve_persistent_states,
"persistent states are disabled"
);
anyhow::ensure!(req.max_size <= PART_MAX_SIZE, "too large max_size");
Ok::<_, anyhow::Error>(())
};

if let Err(e) = persistent_state_request_validation() {
tracing::warn!("persistent_state_request_validation failed: {e:?}");
return overlay::Response::Err(INTERNAL_ERROR_CODE);
}

if !persistent_state_storage.state_exists(&req.mc_block_id, &req.block_id) {
return overlay::Response::Ok(PersistentStatePart::NotFound);
}

match persistent_state_storage
.read_state_part(&req.mc_block_id, &req.block_id, req.offset, req.max_size)
.await
{
Ok(data) => overlay::Response::Ok(PersistentStatePart::Found { data }),
Err(e) => {
tracing::debug!("failed to read persistent state part: {e}");
overlay::Response::Ok(PersistentStatePart::NotFound)
}
}
}

async fn handle_get_archive_info(
&self,
req: &rpc::GetArchiveInfo,
Expand Down Expand Up @@ -514,7 +485,7 @@ impl<B> Inner<B> {
let Some(archive_slice) = block_storage.get_archive_slice(
req.archive_id as u32,
req.offset as usize,
req.max_size as usize,
req.limit as usize,
)?
else {
anyhow::bail!("archive not found");
Expand All @@ -531,6 +502,60 @@ impl<B> Inner<B> {
}
}
}

fn handle_get_persistent_state_info(
&self,
req: &rpc::GetPersistentStateInfo,
) -> overlay::Response<PersistentStateInfo> {
let persistent_state_storage = self.storage().persistent_state_storage();

let res = 'res: {
if self.config.serve_persistent_states {
if let Some(info) = persistent_state_storage.get_state_info(&req.block_id) {
break 'res PersistentStateInfo::Found {
size: info.size as u64,
};
}
}
PersistentStateInfo::NotFound
};

overlay::Response::Ok(res)
}

async fn handle_get_persistent_state_part(
&self,
req: &rpc::GetPersistentStatePart,
) -> overlay::Response<Data> {
const PART_MAX_SIZE: u64 = ByteSize::mib(2).as_u64();

let persistent_state_storage = self.storage().persistent_state_storage();

let persistent_state_request_validation = || {
anyhow::ensure!(
self.config.serve_persistent_states,
"persistent states are disabled"
);
anyhow::ensure!(req.limit as u64 <= PART_MAX_SIZE, "too large max_size");
Ok::<_, anyhow::Error>(())
};

if let Err(e) = persistent_state_request_validation() {
tracing::debug!("persistent state request validation failed: {e:?}");
return overlay::Response::Err(BAD_REQUEST_ERROR_CODE);
}

match persistent_state_storage
.read_state_part(&req.block_id, req.limit, req.offset)
.await
{
Some(data) => overlay::Response::Ok(Data { data: data.into() }),
None => {
tracing::debug!("failed to read persistent state part");
overlay::Response::Err(NOT_FOUND_ERROR_CODE)
}
}
}
}

fn try_handle_prefix(req: &ServiceRequest) -> Result<(u32, &[u8]), tl_proto::TlError> {
Expand Down
1 change: 0 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@ pub mod blockchain_rpc;
pub mod global_config;
pub mod overlay_client;
pub mod proto;
pub mod sync;
Loading

0 comments on commit 68fac03

Please sign in to comment.