diff --git a/rpc/src/config.rs b/rpc/src/config.rs index 4db845625..ca4c0a24b 100644 --- a/rpc/src/config.rs +++ b/rpc/src/config.rs @@ -48,19 +48,12 @@ pub struct TransactionsGcConfig { /// Default: `1 week`. #[serde(with = "serde_helpers::humantime")] pub tx_ttl: Duration, - - /// Interval between garbage collection runs. - /// - /// Default: `1 hour`. - #[serde(with = "serde_helpers::humantime")] - pub interval: Duration, } impl Default for TransactionsGcConfig { fn default() -> Self { Self { tx_ttl: Duration::from_secs(60 * 60 * 24 * 7), - interval: Duration::from_secs(60 * 60), } } } diff --git a/rpc/src/state/mod.rs b/rpc/src/state/mod.rs index b06ec93d2..3f602f434 100644 --- a/rpc/src/state/mod.rs +++ b/rpc/src/state/mod.rs @@ -1,24 +1,26 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; use arc_swap::{ArcSwap, ArcSwapOption}; use everscale_types::models::*; use everscale_types::prelude::*; use futures_util::future::BoxFuture; use parking_lot::RwLock; use serde_json::value::RawValue; +use tokio::sync::Notify; +use tokio::task::JoinHandle; use tycho_block_util::block::BlockStuff; use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff}; use tycho_core::block_strider::{ BlockSubscriber, BlockSubscriberContext, StateSubscriber, StateSubscriberContext, }; use tycho_core::blockchain_rpc::BlockchainRpcClient; -use tycho_storage::{CodeHashesIter, Storage, TransactionsIterBuilder}; +use tycho_storage::{CodeHashesIter, KeyBlocksDirection, Storage, TransactionsIterBuilder}; use tycho_util::time::now_sec; use tycho_util::FastHashMap; -use crate::config::RpcConfig; +use crate::config::{RpcConfig, TransactionsGcConfig}; use crate::endpoint::RpcEndpoint; use crate::models::{GenTimings, LatestBlockchainConfigRef, LatestKeyBlockRef, StateTimings}; @@ -31,6 +33,17 @@ impl RpcStateBuilder { pub fn build(self) -> RpcState { let (storage, blockchain_rpc_client) = self.mandatory_fields; + let gc_notify = Arc::new(Notify::new()); + let gc_handle = if let Some(config) = &self.config.transactions_gc { + Some(tokio::spawn(transactions_gc( + config.clone(), + storage.clone(), + gc_notify.clone(), + ))) + } else { + None + }; + RpcState { inner: Arc::new(Inner { config: self.config, @@ -42,6 +55,8 @@ impl RpcStateBuilder { timings: ArcSwap::new(Default::default()), latest_key_block_json: ArcSwapOption::default(), blockchain_config_json: ArcSwapOption::default(), + gc_notify, + gc_handle, }), } } @@ -215,10 +230,13 @@ struct Inner { timings: ArcSwap, latest_key_block_json: ArcSwapOption>, blockchain_config_json: ArcSwapOption>, + // GC + gc_notify: Arc, + gc_handle: Option>, } impl Inner { - async fn init(&self, mc_block_id: &BlockId) -> Result<()> { + async fn init(self: &Arc, mc_block_id: &BlockId) -> Result<()> { anyhow::ensure!(mc_block_id.is_masterchain(), "not a masterchain state"); let blocks = self.storage.block_storage(); @@ -366,6 +384,11 @@ impl Inner { } } + // Send a new KeyBlock notification to run GC + if self.config.transactions_gc.is_some() { + self.gc_notify.notify_waiters(); + } + let custom = block.load_custom()?; // Try to update cached config: @@ -483,6 +506,14 @@ impl Inner { } } +impl Drop for Inner { + fn drop(&mut self) { + if let Some(handle) = self.gc_handle.take() { + handle.abort(); + } + } +} + pub enum LoadedAccountState { NotFound { timings: GenTimings, @@ -523,6 +554,69 @@ type ShardAccountsDict = Dict; type CachedJson = Arc>; +async fn transactions_gc(config: TransactionsGcConfig, storage: Storage, gc_notify: Arc) { + let Some(persistent_storage) = storage.rpc_storage() else { + return; + }; + + let Ok(tx_ttl_sec) = config.tx_ttl.as_secs().try_into() else { + return; + }; + + loop { + // Wait for a new KeyBlock notification + gc_notify.notified().await; + + let target_utime = now_sec().saturating_sub(tx_ttl_sec); + let min_lt = match find_closest_key_block_lt(&storage, target_utime).await { + Ok(lt) => lt, + Err(e) => { + tracing::error!(target_utime, "failed to find the closest key block lt: {e}"); + continue; + } + }; + + if let Err(e) = persistent_storage.remove_old_transactions(min_lt).await { + tracing::error!( + target_utime, + min_lt, + "failed to remove old transactions: {e:?}" + ); + } + } +} + +async fn find_closest_key_block_lt(storage: &Storage, utime: u32) -> Result { + let block_handle_storage = storage.block_handle_storage(); + + // Find the key block with max seqno which was preduced not later than `utime` + let handle = 'last_key_block: { + let iter = block_handle_storage.key_blocks_iterator(KeyBlocksDirection::Backward); + for key_block_id in iter { + let handle = block_handle_storage + .load_handle(&key_block_id) + .with_context(|| format!("key block not found: {key_block_id}"))?; + + if handle.meta().gen_utime() <= utime { + break 'last_key_block handle; + } + } + + return Ok(0); + }; + + // Load block proof + let block_proof = storage + .block_storage() + .load_block_proof(&handle, false) + .await?; + + // Read `start_lt` from virtual block info + let (virt_block, _) = block_proof.virtualize_block()?; + let info = virt_block.info.load()?; + Ok(info.start_lt) +} + #[derive(Debug, thiserror::Error)] pub enum RpcStateError { #[error("not ready")]