From 84338b28efba1416f30ffc13d4cd475d64959d6b Mon Sep 17 00:00:00 2001 From: Alexey Pashinov Date: Wed, 5 Jun 2024 18:25:02 +0200 Subject: [PATCH 1/4] feat(rpc): add transactions gc --- rpc/src/config.rs | 7 --- rpc/src/state/mod.rs | 108 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 105 insertions(+), 10 deletions(-) diff --git a/rpc/src/config.rs b/rpc/src/config.rs index 4db845625..ca4c0a24b 100644 --- a/rpc/src/config.rs +++ b/rpc/src/config.rs @@ -48,19 +48,12 @@ pub struct TransactionsGcConfig { /// Default: `1 week`. #[serde(with = "serde_helpers::humantime")] pub tx_ttl: Duration, - - /// Interval between garbage collection runs. - /// - /// Default: `1 hour`. - #[serde(with = "serde_helpers::humantime")] - pub interval: Duration, } impl Default for TransactionsGcConfig { fn default() -> Self { Self { tx_ttl: Duration::from_secs(60 * 60 * 24 * 7), - interval: Duration::from_secs(60 * 60), } } } diff --git a/rpc/src/state/mod.rs b/rpc/src/state/mod.rs index b06ec93d2..8aad4074f 100644 --- a/rpc/src/state/mod.rs +++ b/rpc/src/state/mod.rs @@ -1,20 +1,22 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use anyhow::Result; +use anyhow::{anyhow, Result}; use arc_swap::{ArcSwap, ArcSwapOption}; use everscale_types::models::*; use everscale_types::prelude::*; use futures_util::future::BoxFuture; use parking_lot::RwLock; use serde_json::value::RawValue; +use tokio::sync::Notify; +use tokio::task::JoinHandle; use tycho_block_util::block::BlockStuff; use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff}; use tycho_core::block_strider::{ BlockSubscriber, BlockSubscriberContext, StateSubscriber, StateSubscriberContext, }; use tycho_core::blockchain_rpc::BlockchainRpcClient; -use tycho_storage::{CodeHashesIter, Storage, TransactionsIterBuilder}; +use tycho_storage::{CodeHashesIter, KeyBlocksDirection, Storage, TransactionsIterBuilder}; use tycho_util::time::now_sec; use tycho_util::FastHashMap; @@ -42,6 +44,7 @@ impl RpcStateBuilder { timings: ArcSwap::new(Default::default()), latest_key_block_json: ArcSwapOption::default(), blockchain_config_json: ArcSwapOption::default(), + transactions_gc: ArcSwapOption::default(), }), } } @@ -215,10 +218,11 @@ struct Inner { timings: ArcSwap, latest_key_block_json: ArcSwapOption>, blockchain_config_json: ArcSwapOption>, + transactions_gc: ArcSwapOption, } impl Inner { - async fn init(&self, mc_block_id: &BlockId) -> Result<()> { + async fn init(self: &Arc, mc_block_id: &BlockId) -> Result<()> { anyhow::ensure!(mc_block_id.is_masterchain(), "not a masterchain state"); let blocks = self.storage.block_storage(); @@ -286,6 +290,54 @@ impl Inner { } } + if let Some(gc) = &self.config.transactions_gc { + let gc = gc.clone(); + let this = Arc::downgrade(self); + let handle = tokio::spawn(async move { + loop { + let this = match this.upgrade() { + Some(item) => item, + None => return, + }; + + if let Some(transactions_gc) = &*this.transactions_gc.load() { + // Wait for a new KeyBlock notification + transactions_gc.notify.notified().await; + + let persistent_storage = match this.storage.rpc_storage() { + Some(persistent_storage) => persistent_storage, + None => return, + }; + + let target_utime = now_sec().saturating_sub(gc.tx_ttl.as_secs() as u32); + let min_lt = match this.find_closest_key_block_lt(target_utime).await { + Ok(lt) => lt, + Err(e) => { + tracing::error!( + target_utime, + "failed to find the closest key block lt: {e:?}" + ); + continue; + } + }; + + if let Err(e) = persistent_storage.remove_old_transactions(min_lt).await { + tracing::error!( + target_utime, + min_lt, + "failed to remove old transactions: {e:?}" + ); + } + } + } + }); + + self.transactions_gc.store(Some(Arc::new(TransactionsGc { + notify: Notify::new(), + handle, + }))); + } + self.is_ready.store(true, Ordering::Release); Ok(()) } @@ -366,6 +418,11 @@ impl Inner { } } + // Send a new KeyBlock notification to run GC + if let Some(transactions_gc) = &*self.transactions_gc.load() { + transactions_gc.notify.notify_waiters(); + } + let custom = block.load_custom()?; // Try to update cached config: @@ -481,6 +538,46 @@ impl Inner { Ok(accounts) } + + pub async fn find_closest_key_block_lt(&self, utime: u32) -> Result { + let block_handle_storage = self.storage.block_handle_storage(); + + // Find the key block with max seqno which was preduced not later than `utime` + let handle = 'last_key_block: { + let iter = block_handle_storage.key_blocks_iterator(KeyBlocksDirection::Backward); + for key_block in iter { + let handle = block_handle_storage + .load_handle(&key_block) + .ok_or(anyhow!("Key block not found"))?; + + if handle.meta().gen_utime() <= utime { + break 'last_key_block handle; + } + } + + return Ok(0); + }; + + // Load block proof + let block_proof = self + .storage + .block_storage() + .load_block_proof(&handle, false) + .await?; + + // Read `start_lt` from virtual block info + let (virt_block, _) = block_proof.virtualize_block()?; + let info = virt_block.info.load()?; + Ok(info.start_lt) + } +} + +impl Drop for Inner { + fn drop(&mut self) { + if let Some(transactions_gc) = &*self.transactions_gc.load() { + transactions_gc.handle.abort() + } + } } pub enum LoadedAccountState { @@ -519,6 +616,11 @@ impl CachedAccounts { } } +struct TransactionsGc { + notify: Notify, + handle: JoinHandle<()>, +} + type ShardAccountsDict = Dict; type CachedJson = Arc>; From 40f0d6313369f74b8368e3ba2b164e73f3751420 Mon Sep 17 00:00:00 2001 From: Alexey Pashinov Date: Thu, 6 Jun 2024 11:24:29 +0200 Subject: [PATCH 2/4] feat(rpc): remove gc_notify from under ArcSwap --- rpc/src/state/mod.rs | 71 ++++++++++++++++++++------------------------ 1 file changed, 32 insertions(+), 39 deletions(-) diff --git a/rpc/src/state/mod.rs b/rpc/src/state/mod.rs index 8aad4074f..345e39036 100644 --- a/rpc/src/state/mod.rs +++ b/rpc/src/state/mod.rs @@ -44,7 +44,8 @@ impl RpcStateBuilder { timings: ArcSwap::new(Default::default()), latest_key_block_json: ArcSwapOption::default(), blockchain_config_json: ArcSwapOption::default(), - transactions_gc: ArcSwapOption::default(), + gc_notify: Notify::new(), + gc_handle: ArcSwapOption::default(), }), } } @@ -218,7 +219,9 @@ struct Inner { timings: ArcSwap, latest_key_block_json: ArcSwapOption>, blockchain_config_json: ArcSwapOption>, - transactions_gc: ArcSwapOption, + // GC + gc_notify: Notify, + gc_handle: ArcSwapOption>, } impl Inner { @@ -300,42 +303,37 @@ impl Inner { None => return, }; - if let Some(transactions_gc) = &*this.transactions_gc.load() { - // Wait for a new KeyBlock notification - transactions_gc.notify.notified().await; - - let persistent_storage = match this.storage.rpc_storage() { - Some(persistent_storage) => persistent_storage, - None => return, - }; - - let target_utime = now_sec().saturating_sub(gc.tx_ttl.as_secs() as u32); - let min_lt = match this.find_closest_key_block_lt(target_utime).await { - Ok(lt) => lt, - Err(e) => { - tracing::error!( - target_utime, - "failed to find the closest key block lt: {e:?}" - ); - continue; - } - }; - - if let Err(e) = persistent_storage.remove_old_transactions(min_lt).await { + // Wait for a new KeyBlock notification + this.gc_notify.notified().await; + + let persistent_storage = match this.storage.rpc_storage() { + Some(persistent_storage) => persistent_storage, + None => return, + }; + + let target_utime = now_sec().saturating_sub(gc.tx_ttl.as_secs() as u32); + let min_lt = match this.find_closest_key_block_lt(target_utime).await { + Ok(lt) => lt, + Err(e) => { tracing::error!( target_utime, - min_lt, - "failed to remove old transactions: {e:?}" + "failed to find the closest key block lt: {e:?}" ); + continue; } + }; + + if let Err(e) = persistent_storage.remove_old_transactions(min_lt).await { + tracing::error!( + target_utime, + min_lt, + "failed to remove old transactions: {e:?}" + ); } } }); - self.transactions_gc.store(Some(Arc::new(TransactionsGc { - notify: Notify::new(), - handle, - }))); + self.gc_handle.store(Some(Arc::new(handle))); } self.is_ready.store(true, Ordering::Release); @@ -419,8 +417,8 @@ impl Inner { } // Send a new KeyBlock notification to run GC - if let Some(transactions_gc) = &*self.transactions_gc.load() { - transactions_gc.notify.notify_waiters(); + if self.config.transactions_gc.is_some() { + self.gc_notify.notify_waiters(); } let custom = block.load_custom()?; @@ -574,8 +572,8 @@ impl Inner { impl Drop for Inner { fn drop(&mut self) { - if let Some(transactions_gc) = &*self.transactions_gc.load() { - transactions_gc.handle.abort() + if let Some(handle) = &*self.gc_handle.load() { + handle.abort() } } } @@ -616,11 +614,6 @@ impl CachedAccounts { } } -struct TransactionsGc { - notify: Notify, - handle: JoinHandle<()>, -} - type ShardAccountsDict = Dict; type CachedJson = Arc>; From 1dc10f4ca32131858e13c564c6b33b93a44817e5 Mon Sep 17 00:00:00 2001 From: Alexey Pashinov Date: Thu, 6 Jun 2024 12:49:17 +0200 Subject: [PATCH 3/4] fix(rpc): clippy warnings --- rpc/src/state/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/src/state/mod.rs b/rpc/src/state/mod.rs index 345e39036..cc1e6ed4c 100644 --- a/rpc/src/state/mod.rs +++ b/rpc/src/state/mod.rs @@ -573,7 +573,7 @@ impl Inner { impl Drop for Inner { fn drop(&mut self) { if let Some(handle) = &*self.gc_handle.load() { - handle.abort() + handle.abort(); } } } From 4fb77d6bfd4e1c3730ae14e017d554f51e39fef0 Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Thu, 6 Jun 2024 16:07:42 +0200 Subject: [PATCH 4/4] refactor(rpc): simplify transactions gc start --- rpc/src/state/mod.rs | 163 +++++++++++++++++++++---------------------- 1 file changed, 81 insertions(+), 82 deletions(-) diff --git a/rpc/src/state/mod.rs b/rpc/src/state/mod.rs index cc1e6ed4c..3f602f434 100644 --- a/rpc/src/state/mod.rs +++ b/rpc/src/state/mod.rs @@ -1,7 +1,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use anyhow::{anyhow, Result}; +use anyhow::{Context, Result}; use arc_swap::{ArcSwap, ArcSwapOption}; use everscale_types::models::*; use everscale_types::prelude::*; @@ -20,7 +20,7 @@ use tycho_storage::{CodeHashesIter, KeyBlocksDirection, Storage, TransactionsIte use tycho_util::time::now_sec; use tycho_util::FastHashMap; -use crate::config::RpcConfig; +use crate::config::{RpcConfig, TransactionsGcConfig}; use crate::endpoint::RpcEndpoint; use crate::models::{GenTimings, LatestBlockchainConfigRef, LatestKeyBlockRef, StateTimings}; @@ -33,6 +33,17 @@ impl RpcStateBuilder { pub fn build(self) -> RpcState { let (storage, blockchain_rpc_client) = self.mandatory_fields; + let gc_notify = Arc::new(Notify::new()); + let gc_handle = if let Some(config) = &self.config.transactions_gc { + Some(tokio::spawn(transactions_gc( + config.clone(), + storage.clone(), + gc_notify.clone(), + ))) + } else { + None + }; + RpcState { inner: Arc::new(Inner { config: self.config, @@ -44,8 +55,8 @@ impl RpcStateBuilder { timings: ArcSwap::new(Default::default()), latest_key_block_json: ArcSwapOption::default(), blockchain_config_json: ArcSwapOption::default(), - gc_notify: Notify::new(), - gc_handle: ArcSwapOption::default(), + gc_notify, + gc_handle, }), } } @@ -220,8 +231,8 @@ struct Inner { latest_key_block_json: ArcSwapOption>, blockchain_config_json: ArcSwapOption>, // GC - gc_notify: Notify, - gc_handle: ArcSwapOption>, + gc_notify: Arc, + gc_handle: Option>, } impl Inner { @@ -293,49 +304,6 @@ impl Inner { } } - if let Some(gc) = &self.config.transactions_gc { - let gc = gc.clone(); - let this = Arc::downgrade(self); - let handle = tokio::spawn(async move { - loop { - let this = match this.upgrade() { - Some(item) => item, - None => return, - }; - - // Wait for a new KeyBlock notification - this.gc_notify.notified().await; - - let persistent_storage = match this.storage.rpc_storage() { - Some(persistent_storage) => persistent_storage, - None => return, - }; - - let target_utime = now_sec().saturating_sub(gc.tx_ttl.as_secs() as u32); - let min_lt = match this.find_closest_key_block_lt(target_utime).await { - Ok(lt) => lt, - Err(e) => { - tracing::error!( - target_utime, - "failed to find the closest key block lt: {e:?}" - ); - continue; - } - }; - - if let Err(e) = persistent_storage.remove_old_transactions(min_lt).await { - tracing::error!( - target_utime, - min_lt, - "failed to remove old transactions: {e:?}" - ); - } - } - }); - - self.gc_handle.store(Some(Arc::new(handle))); - } - self.is_ready.store(true, Ordering::Release); Ok(()) } @@ -536,43 +504,11 @@ impl Inner { Ok(accounts) } - - pub async fn find_closest_key_block_lt(&self, utime: u32) -> Result { - let block_handle_storage = self.storage.block_handle_storage(); - - // Find the key block with max seqno which was preduced not later than `utime` - let handle = 'last_key_block: { - let iter = block_handle_storage.key_blocks_iterator(KeyBlocksDirection::Backward); - for key_block in iter { - let handle = block_handle_storage - .load_handle(&key_block) - .ok_or(anyhow!("Key block not found"))?; - - if handle.meta().gen_utime() <= utime { - break 'last_key_block handle; - } - } - - return Ok(0); - }; - - // Load block proof - let block_proof = self - .storage - .block_storage() - .load_block_proof(&handle, false) - .await?; - - // Read `start_lt` from virtual block info - let (virt_block, _) = block_proof.virtualize_block()?; - let info = virt_block.info.load()?; - Ok(info.start_lt) - } } impl Drop for Inner { fn drop(&mut self) { - if let Some(handle) = &*self.gc_handle.load() { + if let Some(handle) = self.gc_handle.take() { handle.abort(); } } @@ -618,6 +554,69 @@ type ShardAccountsDict = Dict; type CachedJson = Arc>; +async fn transactions_gc(config: TransactionsGcConfig, storage: Storage, gc_notify: Arc) { + let Some(persistent_storage) = storage.rpc_storage() else { + return; + }; + + let Ok(tx_ttl_sec) = config.tx_ttl.as_secs().try_into() else { + return; + }; + + loop { + // Wait for a new KeyBlock notification + gc_notify.notified().await; + + let target_utime = now_sec().saturating_sub(tx_ttl_sec); + let min_lt = match find_closest_key_block_lt(&storage, target_utime).await { + Ok(lt) => lt, + Err(e) => { + tracing::error!(target_utime, "failed to find the closest key block lt: {e}"); + continue; + } + }; + + if let Err(e) = persistent_storage.remove_old_transactions(min_lt).await { + tracing::error!( + target_utime, + min_lt, + "failed to remove old transactions: {e:?}" + ); + } + } +} + +async fn find_closest_key_block_lt(storage: &Storage, utime: u32) -> Result { + let block_handle_storage = storage.block_handle_storage(); + + // Find the key block with max seqno which was preduced not later than `utime` + let handle = 'last_key_block: { + let iter = block_handle_storage.key_blocks_iterator(KeyBlocksDirection::Backward); + for key_block_id in iter { + let handle = block_handle_storage + .load_handle(&key_block_id) + .with_context(|| format!("key block not found: {key_block_id}"))?; + + if handle.meta().gen_utime() <= utime { + break 'last_key_block handle; + } + } + + return Ok(0); + }; + + // Load block proof + let block_proof = storage + .block_storage() + .load_block_proof(&handle, false) + .await?; + + // Read `start_lt` from virtual block info + let (virt_block, _) = block_proof.virtualize_block()?; + let info = virt_block.info.load()?; + Ok(info.start_lt) +} + #[derive(Debug, thiserror::Error)] pub enum RpcStateError { #[error("not ready")]