Skip to content

Commit

Permalink
Merge branch 'feature/transactions_gc'
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Jun 6, 2024
2 parents fcd36af + 4fb77d6 commit ba38bf0
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 11 deletions.
7 changes: 0 additions & 7 deletions rpc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,12 @@ pub struct TransactionsGcConfig {
/// Default: `1 week`.
#[serde(with = "serde_helpers::humantime")]
pub tx_ttl: Duration,

/// Interval between garbage collection runs.
///
/// Default: `1 hour`.
#[serde(with = "serde_helpers::humantime")]
pub interval: Duration,
}

impl Default for TransactionsGcConfig {
fn default() -> Self {
Self {
tx_ttl: Duration::from_secs(60 * 60 * 24 * 7),
interval: Duration::from_secs(60 * 60),
}
}
}
102 changes: 98 additions & 4 deletions rpc/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use anyhow::Result;
use anyhow::{Context, Result};
use arc_swap::{ArcSwap, ArcSwapOption};
use everscale_types::models::*;
use everscale_types::prelude::*;
use futures_util::future::BoxFuture;
use parking_lot::RwLock;
use serde_json::value::RawValue;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tycho_block_util::block::BlockStuff;
use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff};
use tycho_core::block_strider::{
BlockSubscriber, BlockSubscriberContext, StateSubscriber, StateSubscriberContext,
};
use tycho_core::blockchain_rpc::BlockchainRpcClient;
use tycho_storage::{CodeHashesIter, Storage, TransactionsIterBuilder};
use tycho_storage::{CodeHashesIter, KeyBlocksDirection, Storage, TransactionsIterBuilder};
use tycho_util::time::now_sec;
use tycho_util::FastHashMap;

use crate::config::RpcConfig;
use crate::config::{RpcConfig, TransactionsGcConfig};
use crate::endpoint::RpcEndpoint;
use crate::models::{GenTimings, LatestBlockchainConfigRef, LatestKeyBlockRef, StateTimings};

Expand All @@ -31,6 +33,17 @@ impl RpcStateBuilder {
pub fn build(self) -> RpcState {
let (storage, blockchain_rpc_client) = self.mandatory_fields;

let gc_notify = Arc::new(Notify::new());
let gc_handle = if let Some(config) = &self.config.transactions_gc {
Some(tokio::spawn(transactions_gc(
config.clone(),
storage.clone(),
gc_notify.clone(),
)))
} else {
None
};

RpcState {
inner: Arc::new(Inner {
config: self.config,
Expand All @@ -42,6 +55,8 @@ impl RpcStateBuilder {
timings: ArcSwap::new(Default::default()),
latest_key_block_json: ArcSwapOption::default(),
blockchain_config_json: ArcSwapOption::default(),
gc_notify,
gc_handle,
}),
}
}
Expand Down Expand Up @@ -215,10 +230,13 @@ struct Inner {
timings: ArcSwap<StateTimings>,
latest_key_block_json: ArcSwapOption<Box<RawValue>>,
blockchain_config_json: ArcSwapOption<Box<RawValue>>,
// GC
gc_notify: Arc<Notify>,
gc_handle: Option<JoinHandle<()>>,
}

impl Inner {
async fn init(&self, mc_block_id: &BlockId) -> Result<()> {
async fn init(self: &Arc<Self>, mc_block_id: &BlockId) -> Result<()> {
anyhow::ensure!(mc_block_id.is_masterchain(), "not a masterchain state");

let blocks = self.storage.block_storage();
Expand Down Expand Up @@ -366,6 +384,11 @@ impl Inner {
}
}

// Send a new KeyBlock notification to run GC
if self.config.transactions_gc.is_some() {
self.gc_notify.notify_waiters();
}

let custom = block.load_custom()?;

// Try to update cached config:
Expand Down Expand Up @@ -483,6 +506,14 @@ impl Inner {
}
}

impl Drop for Inner {
fn drop(&mut self) {
if let Some(handle) = self.gc_handle.take() {
handle.abort();
}
}
}

pub enum LoadedAccountState {
NotFound {
timings: GenTimings,
Expand Down Expand Up @@ -523,6 +554,69 @@ type ShardAccountsDict = Dict<HashBytes, (DepthBalanceInfo, ShardAccount)>;

type CachedJson = Arc<Box<RawValue>>;

async fn transactions_gc(config: TransactionsGcConfig, storage: Storage, gc_notify: Arc<Notify>) {
let Some(persistent_storage) = storage.rpc_storage() else {
return;
};

let Ok(tx_ttl_sec) = config.tx_ttl.as_secs().try_into() else {
return;
};

loop {
// Wait for a new KeyBlock notification
gc_notify.notified().await;

let target_utime = now_sec().saturating_sub(tx_ttl_sec);
let min_lt = match find_closest_key_block_lt(&storage, target_utime).await {
Ok(lt) => lt,
Err(e) => {
tracing::error!(target_utime, "failed to find the closest key block lt: {e}");
continue;
}
};

if let Err(e) = persistent_storage.remove_old_transactions(min_lt).await {
tracing::error!(
target_utime,
min_lt,
"failed to remove old transactions: {e:?}"
);
}
}
}

async fn find_closest_key_block_lt(storage: &Storage, utime: u32) -> Result<u64> {
let block_handle_storage = storage.block_handle_storage();

// Find the key block with max seqno which was preduced not later than `utime`
let handle = 'last_key_block: {
let iter = block_handle_storage.key_blocks_iterator(KeyBlocksDirection::Backward);
for key_block_id in iter {
let handle = block_handle_storage
.load_handle(&key_block_id)
.with_context(|| format!("key block not found: {key_block_id}"))?;

if handle.meta().gen_utime() <= utime {
break 'last_key_block handle;
}
}

return Ok(0);
};

// Load block proof
let block_proof = storage
.block_storage()
.load_block_proof(&handle, false)
.await?;

// Read `start_lt` from virtual block info
let (virt_block, _) = block_proof.virtualize_block()?;
let info = virt_block.info.load()?;
Ok(info.start_lt)
}

#[derive(Debug, thiserror::Error)]
pub enum RpcStateError {
#[error("not ready")]
Expand Down

0 comments on commit ba38bf0

Please sign in to comment.