From d9b44f44b72442a02bfc4d7c48ffc1d056cc35ab Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Thu, 10 Oct 2024 16:24:17 -0700 Subject: [PATCH 1/8] modify is_stable to be indexer progressing and height caught up --- chain-signatures/node/src/mesh/connection.rs | 52 +++++++++++++++++++- chain-signatures/node/src/mesh/mod.rs | 10 +++- chain-signatures/node/src/web/mod.rs | 10 ++-- 3 files changed, 64 insertions(+), 8 deletions(-) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 310f4363..936c3836 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::time::{Duration, Instant}; use cait_sith::protocol::Participant; +use near_primitives::types::BlockHeight; use tokio::sync::RwLock; use url::Url; @@ -165,13 +166,36 @@ impl Pool { self.potential_connections.read().await.clone() } - pub async fn is_participant_stable(&self, participant: &Participant) -> bool { + async fn max_block_height_among_participants(&self) -> BlockHeight { + self.status + .read() + .await + .values() + .filter_map(|state| { + if let StateView::Running { + latest_block_height, + .. + } = state + { + Some(*latest_block_height) + } else { + None + } + }) + .max() + .unwrap_or(0) + } + + pub async fn is_participant_indexer_progressing(&self, participant: &Participant) -> bool { self.status .read() .await .get(participant) .map_or(false, |state| match state { - StateView::Running { is_stable, .. } => *is_stable, + StateView::Running { + is_indexer_progressing, + .. + } => *is_indexer_progressing, _ => false, }) } @@ -213,4 +237,28 @@ impl Pool { ) .await } + + pub async fn is_participant_indexer_caught_up(&self, participant: &Participant) -> bool { + let max_block_height = self.max_block_height_among_participants().await; + + if max_block_height == 0 { + return false; + } + + let my_block_height = self + .status + .read() + .await + .get(participant) + .and_then(|state| match state { + StateView::Running { + latest_block_height, + .. + } => Some(*latest_block_height), + _ => None, + }) + .unwrap_or(0); + + (max_block_height - my_block_height) < 50 + } } diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index 7dcedbcc..2ba03a01 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -89,7 +89,15 @@ impl Mesh { pub async fn stable_participants(&self) -> Participants { let mut stable = Participants::default(); for (participant, info) in self.active_participants().iter() { - if self.connections.is_participant_stable(participant).await { + if self + .connections + .is_participant_indexer_progressing(participant) + .await + && self + .connections + .is_participant_indexer_caught_up(participant) + .await + { stable.insert(participant, info.clone()); } } diff --git a/chain-signatures/node/src/web/mod.rs b/chain-signatures/node/src/web/mod.rs index 9161b05a..8e3f9d50 100644 --- a/chain-signatures/node/src/web/mod.rs +++ b/chain-signatures/node/src/web/mod.rs @@ -112,13 +112,13 @@ pub enum StateView { presignature_mine_count: usize, presignature_potential_count: usize, latest_block_height: BlockHeight, - is_stable: bool, + is_indexer_progressing: bool, }, Resharing { old_participants: Vec, new_participants: Vec, latest_block_height: BlockHeight, - is_stable: bool, + is_indexer_progressing: bool, }, Joining { participants: Vec, @@ -131,7 +131,7 @@ pub enum StateView { async fn state(Extension(state): Extension>) -> Result> { tracing::debug!("fetching state"); let latest_block_height = state.indexer.latest_block_height().await; - let is_stable = state.indexer.is_on_track().await; + let is_indexer_progressing = state.indexer.is_on_track().await; let protocol_state = state.protocol_state.read().await; match &*protocol_state { @@ -155,7 +155,7 @@ async fn state(Extension(state): Extension>) -> Result { @@ -165,7 +165,7 @@ async fn state(Extension(state): Extension>) -> Result { From 430678b254b3958bd32def4bbb3998448a783de3 Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Fri, 11 Oct 2024 13:15:39 -0700 Subject: [PATCH 2/8] pull max height from rpc endpoint --- chain-signatures/node/src/cli.rs | 15 +++--- chain-signatures/node/src/indexer.rs | 38 ++++++++++++-- chain-signatures/node/src/mesh/connection.rs | 52 +------------------- chain-signatures/node/src/mesh/mod.rs | 10 +--- chain-signatures/node/src/rpc_client.rs | 17 +++++++ chain-signatures/node/src/web/mod.rs | 10 ++-- 6 files changed, 68 insertions(+), 74 deletions(-) diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 25bbf744..77e372e6 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -193,12 +193,21 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { .build()?; let gcp_service = rt.block_on(async { GcpService::init(&account_id, &storage_options).await })?; + + let mut rpc_client = near_fetch::Client::new(&near_rpc); + if let Some(referer_param) = client_header_referer { + let client_headers = rpc_client.inner_mut().headers_mut(); + client_headers.insert(http::header::REFERER, referer_param.parse().unwrap()); + } + tracing::info!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized"); + let (indexer_handle, indexer) = indexer::run( &indexer_options, &mpc_contract_id, &account_id, &sign_queue, &gcp_service, + rpc_client.clone(), &rt, )?; @@ -222,13 +231,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { let (sender, receiver) = mpsc::channel(16384); tracing::info!(%my_address, "address detected"); - let mut rpc_client = near_fetch::Client::new(&near_rpc); - if let Some(referer_param) = client_header_referer { - let client_headers = rpc_client.inner_mut().headers_mut(); - client_headers.insert(http::header::REFERER, referer_param.parse().unwrap()); - } - tracing::info!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized"); let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk); let (protocol, protocol_state) = MpcSignProtocol::init( my_address, diff --git a/chain-signatures/node/src/indexer.rs b/chain-signatures/node/src/indexer.rs index 4239dd55..d944e66b 100644 --- a/chain-signatures/node/src/indexer.rs +++ b/chain-signatures/node/src/indexer.rs @@ -1,6 +1,7 @@ use crate::gcp::error::DatastoreStorageError; use crate::gcp::GcpService; use crate::protocol::{SignQueue, SignRequest}; +use crate::rpc_client; use crate::types::LatestBlockHeight; use crypto_shared::{derive_epsilon, ScalarExt}; use k256::Scalar; @@ -53,6 +54,14 @@ pub struct Options { /// The threshold in seconds to check if the indexer needs to be restarted due to it stalling. #[clap(long, env("MPC_INDEXER_RUNNING_THRESHOLD"), default_value = "300")] pub running_threshold: u64, + + /// The threshold in block height lag to check if the indexer has caught up. + #[clap( + long, + env("MPC_INDEXER_BLOCK_HEIGHT_LAG_THRESHOLD"), + default_value = "50" + )] + pub block_height_lag_threshold: u64, } impl Options { @@ -68,6 +77,8 @@ impl Options { self.behind_threshold.to_string(), "--running-threshold".to_string(), self.running_threshold.to_string(), + "--block-height-lag-threshold".to_string(), + self.block_height_lag_threshold.to_string(), ]; if let Some(s3_url) = self.s3_url { @@ -105,10 +116,16 @@ pub struct Indexer { last_updated_timestamp: Arc>, running_threshold: Duration, behind_threshold: Duration, + block_height_lag_threshold: u64, + rpc_client: near_fetch::Client, } impl Indexer { - fn new(latest_block_height: LatestBlockHeight, options: &Options) -> Self { + fn new( + latest_block_height: LatestBlockHeight, + options: &Options, + rpc_client: near_fetch::Client, + ) -> Self { tracing::info!( "creating new indexer, latest block height: {}", latest_block_height.block_height @@ -118,6 +135,8 @@ impl Indexer { last_updated_timestamp: Arc::new(RwLock::new(Instant::now())), running_threshold: Duration::from_secs(options.running_threshold), behind_threshold: Duration::from_secs(options.behind_threshold), + block_height_lag_threshold: options.block_height_lag_threshold, + rpc_client, } } @@ -126,7 +145,7 @@ impl Indexer { self.latest_block_height.read().await.block_height } - /// Check whether the indexer is on track with the latest block height from the chain. + /// Check whether the indexer block height has been updated recently. pub async fn is_on_track(&self) -> bool { self.last_updated_timestamp.read().await.elapsed() <= self.behind_threshold } @@ -138,7 +157,17 @@ impl Indexer { /// Check whether the indexer is behind with the latest block height from the chain. pub async fn is_behind(&self) -> bool { - self.last_updated_timestamp.read().await.elapsed() > self.behind_threshold + let network_latest_height = rpc_client::fetch_latest_block_height(&self.rpc_client).await; + if let Ok(network_latest_height) = network_latest_height { + self.latest_block_height().await + < network_latest_height - self.block_height_lag_threshold + } else { + false + } + } + + pub async fn is_stable(&self) -> bool { + !self.is_behind().await && self.is_on_track().await } async fn update_block_height( @@ -287,6 +316,7 @@ pub fn run( node_account_id: &AccountId, queue: &Arc>, gcp_service: &crate::gcp::GcpService, + rpc_client: near_fetch::Client, rt: &tokio::runtime::Runtime, ) -> anyhow::Result<(JoinHandle>, Indexer)> { tracing::info!( @@ -311,7 +341,7 @@ pub fn run( } }); - let indexer = Indexer::new(latest_block_height, options); + let indexer = Indexer::new(latest_block_height, options, rpc_client); let context = Context { mpc_contract_id: mpc_contract_id.clone(), node_account_id: node_account_id.clone(), diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 936c3836..310f4363 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::time::{Duration, Instant}; use cait_sith::protocol::Participant; -use near_primitives::types::BlockHeight; use tokio::sync::RwLock; use url::Url; @@ -166,36 +165,13 @@ impl Pool { self.potential_connections.read().await.clone() } - async fn max_block_height_among_participants(&self) -> BlockHeight { - self.status - .read() - .await - .values() - .filter_map(|state| { - if let StateView::Running { - latest_block_height, - .. - } = state - { - Some(*latest_block_height) - } else { - None - } - }) - .max() - .unwrap_or(0) - } - - pub async fn is_participant_indexer_progressing(&self, participant: &Participant) -> bool { + pub async fn is_participant_stable(&self, participant: &Participant) -> bool { self.status .read() .await .get(participant) .map_or(false, |state| match state { - StateView::Running { - is_indexer_progressing, - .. - } => *is_indexer_progressing, + StateView::Running { is_stable, .. } => *is_stable, _ => false, }) } @@ -237,28 +213,4 @@ impl Pool { ) .await } - - pub async fn is_participant_indexer_caught_up(&self, participant: &Participant) -> bool { - let max_block_height = self.max_block_height_among_participants().await; - - if max_block_height == 0 { - return false; - } - - let my_block_height = self - .status - .read() - .await - .get(participant) - .and_then(|state| match state { - StateView::Running { - latest_block_height, - .. - } => Some(*latest_block_height), - _ => None, - }) - .unwrap_or(0); - - (max_block_height - my_block_height) < 50 - } } diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index 2ba03a01..7dcedbcc 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -89,15 +89,7 @@ impl Mesh { pub async fn stable_participants(&self) -> Participants { let mut stable = Participants::default(); for (participant, info) in self.active_participants().iter() { - if self - .connections - .is_participant_indexer_progressing(participant) - .await - && self - .connections - .is_participant_indexer_caught_up(participant) - .await - { + if self.connections.is_participant_stable(participant).await { stable.insert(participant, info.clone()); } } diff --git a/chain-signatures/node/src/rpc_client.rs b/chain-signatures/node/src/rpc_client.rs index 6917af9b..b0298261 100644 --- a/chain-signatures/node/src/rpc_client.rs +++ b/chain-signatures/node/src/rpc_client.rs @@ -4,6 +4,7 @@ use crate::protocol::ProtocolState; use near_account_id::AccountId; use near_crypto::InMemorySigner; +use near_primitives::types::BlockHeight; use serde_json::json; pub async fn fetch_mpc_contract_state( @@ -99,3 +100,19 @@ pub async fn vote_reshared( Ok(result) } + +pub async fn fetch_latest_block_height( + rpc_client: &near_fetch::Client, +) -> anyhow::Result { + let latest_block_height: BlockHeight = rpc_client + .view_block() + .await + .map_err(|e| { + tracing::warn!(%e, "failed to fetch latest block"); + e + })? + .header + .height; + tracing::debug!(latest_block_height, "latest block height"); + Ok(latest_block_height) +} diff --git a/chain-signatures/node/src/web/mod.rs b/chain-signatures/node/src/web/mod.rs index 8e3f9d50..3e045cc6 100644 --- a/chain-signatures/node/src/web/mod.rs +++ b/chain-signatures/node/src/web/mod.rs @@ -112,13 +112,13 @@ pub enum StateView { presignature_mine_count: usize, presignature_potential_count: usize, latest_block_height: BlockHeight, - is_indexer_progressing: bool, + is_stable: bool, }, Resharing { old_participants: Vec, new_participants: Vec, latest_block_height: BlockHeight, - is_indexer_progressing: bool, + is_stable: bool, }, Joining { participants: Vec, @@ -131,7 +131,7 @@ pub enum StateView { async fn state(Extension(state): Extension>) -> Result> { tracing::debug!("fetching state"); let latest_block_height = state.indexer.latest_block_height().await; - let is_indexer_progressing = state.indexer.is_on_track().await; + let is_stable = state.indexer.is_stable().await; let protocol_state = state.protocol_state.read().await; match &*protocol_state { @@ -155,7 +155,7 @@ async fn state(Extension(state): Extension>) -> Result { @@ -165,7 +165,7 @@ async fn state(Extension(state): Extension>) -> Result { From ae3f37fed2bdd37a2eee9d13facafeb4261306c7 Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Fri, 11 Oct 2024 14:32:30 -0700 Subject: [PATCH 3/8] fix integration tests --- integration-tests/chain-signatures/src/containers.rs | 1 + integration-tests/chain-signatures/src/local.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/integration-tests/chain-signatures/src/containers.rs b/integration-tests/chain-signatures/src/containers.rs index b6b69d17..f719fdef 100644 --- a/integration-tests/chain-signatures/src/containers.rs +++ b/integration-tests/chain-signatures/src/containers.rs @@ -99,6 +99,7 @@ impl<'a> Node<'a> { start_block_height: 0, running_threshold: 120, behind_threshold: 120, + block_height_lag_threshold: 50, }; let args = mpc_node::cli::Cli::Start { diff --git a/integration-tests/chain-signatures/src/local.rs b/integration-tests/chain-signatures/src/local.rs index 923ccb15..5c050f05 100644 --- a/integration-tests/chain-signatures/src/local.rs +++ b/integration-tests/chain-signatures/src/local.rs @@ -55,6 +55,7 @@ impl Node { start_block_height: 0, running_threshold: 120, behind_threshold: 120, + block_height_lag_threshold: 50, }; let near_rpc = ctx.lake_indexer.rpc_host_address.clone(); let mpc_contract_id = ctx.mpc_contract.id().clone(); @@ -150,6 +151,7 @@ impl Node { start_block_height: 0, running_threshold: 120, behind_threshold: 120, + block_height_lag_threshold: 50, }; let cli = mpc_node::cli::Cli::Start { near_rpc: config.near_rpc.clone(), From 2fd361a007a5f5494a1611b45af4367649ef2942 Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Tue, 15 Oct 2024 14:36:30 -0700 Subject: [PATCH 4/8] address comments --- chain-signatures/node/src/indexer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain-signatures/node/src/indexer.rs b/chain-signatures/node/src/indexer.rs index d944e66b..87d56380 100644 --- a/chain-signatures/node/src/indexer.rs +++ b/chain-signatures/node/src/indexer.rs @@ -162,7 +162,7 @@ impl Indexer { self.latest_block_height().await < network_latest_height - self.block_height_lag_threshold } else { - false + true } } From 04da1fafc4ddda8b6e8c7b1d3e20c9cd2ba14d1b Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Wed, 16 Oct 2024 15:15:45 -0700 Subject: [PATCH 5/8] use 200 as default to allow 3 min delay from latest rpc fetch --- chain-signatures/node/src/indexer.rs | 2 +- integration-tests/chain-signatures/src/containers.rs | 2 +- integration-tests/chain-signatures/src/local.rs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/chain-signatures/node/src/indexer.rs b/chain-signatures/node/src/indexer.rs index 87d56380..65520c34 100644 --- a/chain-signatures/node/src/indexer.rs +++ b/chain-signatures/node/src/indexer.rs @@ -59,7 +59,7 @@ pub struct Options { #[clap( long, env("MPC_INDEXER_BLOCK_HEIGHT_LAG_THRESHOLD"), - default_value = "50" + default_value = "200" )] pub block_height_lag_threshold: u64, } diff --git a/integration-tests/chain-signatures/src/containers.rs b/integration-tests/chain-signatures/src/containers.rs index f719fdef..6801548f 100644 --- a/integration-tests/chain-signatures/src/containers.rs +++ b/integration-tests/chain-signatures/src/containers.rs @@ -99,7 +99,7 @@ impl<'a> Node<'a> { start_block_height: 0, running_threshold: 120, behind_threshold: 120, - block_height_lag_threshold: 50, + block_height_lag_threshold: 200, }; let args = mpc_node::cli::Cli::Start { diff --git a/integration-tests/chain-signatures/src/local.rs b/integration-tests/chain-signatures/src/local.rs index 5c050f05..b930dd53 100644 --- a/integration-tests/chain-signatures/src/local.rs +++ b/integration-tests/chain-signatures/src/local.rs @@ -55,7 +55,7 @@ impl Node { start_block_height: 0, running_threshold: 120, behind_threshold: 120, - block_height_lag_threshold: 50, + block_height_lag_threshold: 200, }; let near_rpc = ctx.lake_indexer.rpc_host_address.clone(); let mpc_contract_id = ctx.mpc_contract.id().clone(); @@ -151,7 +151,7 @@ impl Node { start_block_height: 0, running_threshold: 120, behind_threshold: 120, - block_height_lag_threshold: 50, + block_height_lag_threshold: 200, }; let cli = mpc_node::cli::Cli::Start { near_rpc: config.near_rpc.clone(), From 04a9e9fcfc541c71458d64211ab81e170389625f Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Thu, 17 Oct 2024 06:56:17 -0700 Subject: [PATCH 6/8] in progress --- integration-tests/chain-signatures/src/containers.rs | 2 +- integration-tests/chain-signatures/src/local.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/integration-tests/chain-signatures/src/containers.rs b/integration-tests/chain-signatures/src/containers.rs index 6801548f..3ad699ea 100644 --- a/integration-tests/chain-signatures/src/containers.rs +++ b/integration-tests/chain-signatures/src/containers.rs @@ -99,7 +99,7 @@ impl<'a> Node<'a> { start_block_height: 0, running_threshold: 120, behind_threshold: 120, - block_height_lag_threshold: 200, + block_height_lag_threshold: 2000, }; let args = mpc_node::cli::Cli::Start { diff --git a/integration-tests/chain-signatures/src/local.rs b/integration-tests/chain-signatures/src/local.rs index b930dd53..436bd7e2 100644 --- a/integration-tests/chain-signatures/src/local.rs +++ b/integration-tests/chain-signatures/src/local.rs @@ -55,7 +55,7 @@ impl Node { start_block_height: 0, running_threshold: 120, behind_threshold: 120, - block_height_lag_threshold: 200, + block_height_lag_threshold: 2000, }; let near_rpc = ctx.lake_indexer.rpc_host_address.clone(); let mpc_contract_id = ctx.mpc_contract.id().clone(); @@ -151,7 +151,7 @@ impl Node { start_block_height: 0, running_threshold: 120, behind_threshold: 120, - block_height_lag_threshold: 200, + block_height_lag_threshold: 2000, }; let cli = mpc_node::cli::Cli::Start { near_rpc: config.near_rpc.clone(), From f9f368623cec9aabc54f421caf74398cd0316062 Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Tue, 29 Oct 2024 17:17:42 -0700 Subject: [PATCH 7/8] use block.header().timestamp_nanoseconds instead of fetching latest block with rpc client --- chain-signatures/node/src/cli.rs | 15 ++--- chain-signatures/node/src/indexer.rs | 58 +++++++------------ chain-signatures/node/src/rpc_client.rs | 17 ------ .../chain-signatures/src/containers.rs | 1 - .../chain-signatures/src/local.rs | 2 - 5 files changed, 28 insertions(+), 65 deletions(-) diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 77e372e6..25bbf744 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -193,21 +193,12 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { .build()?; let gcp_service = rt.block_on(async { GcpService::init(&account_id, &storage_options).await })?; - - let mut rpc_client = near_fetch::Client::new(&near_rpc); - if let Some(referer_param) = client_header_referer { - let client_headers = rpc_client.inner_mut().headers_mut(); - client_headers.insert(http::header::REFERER, referer_param.parse().unwrap()); - } - tracing::info!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized"); - let (indexer_handle, indexer) = indexer::run( &indexer_options, &mpc_contract_id, &account_id, &sign_queue, &gcp_service, - rpc_client.clone(), &rt, )?; @@ -231,7 +222,13 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { let (sender, receiver) = mpsc::channel(16384); tracing::info!(%my_address, "address detected"); + let mut rpc_client = near_fetch::Client::new(&near_rpc); + if let Some(referer_param) = client_header_referer { + let client_headers = rpc_client.inner_mut().headers_mut(); + client_headers.insert(http::header::REFERER, referer_param.parse().unwrap()); + } + tracing::info!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized"); let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk); let (protocol, protocol_state) = MpcSignProtocol::init( my_address, diff --git a/chain-signatures/node/src/indexer.rs b/chain-signatures/node/src/indexer.rs index 65520c34..b652c0e5 100644 --- a/chain-signatures/node/src/indexer.rs +++ b/chain-signatures/node/src/indexer.rs @@ -1,7 +1,6 @@ use crate::gcp::error::DatastoreStorageError; use crate::gcp::GcpService; use crate::protocol::{SignQueue, SignRequest}; -use crate::rpc_client; use crate::types::LatestBlockHeight; use crypto_shared::{derive_epsilon, ScalarExt}; use k256::Scalar; @@ -48,20 +47,12 @@ pub struct Options { pub start_block_height: u64, /// The amount of time before we should that our indexer is behind. - #[clap(long, env("MPC_INDEXER_BEHIND_THRESHOLD"), default_value = "180")] + #[clap(long, env("MPC_INDEXER_BEHIND_THRESHOLD"), default_value = "200")] pub behind_threshold: u64, /// The threshold in seconds to check if the indexer needs to be restarted due to it stalling. #[clap(long, env("MPC_INDEXER_RUNNING_THRESHOLD"), default_value = "300")] pub running_threshold: u64, - - /// The threshold in block height lag to check if the indexer has caught up. - #[clap( - long, - env("MPC_INDEXER_BLOCK_HEIGHT_LAG_THRESHOLD"), - default_value = "200" - )] - pub block_height_lag_threshold: u64, } impl Options { @@ -77,8 +68,6 @@ impl Options { self.behind_threshold.to_string(), "--running-threshold".to_string(), self.running_threshold.to_string(), - "--block-height-lag-threshold".to_string(), - self.block_height_lag_threshold.to_string(), ]; if let Some(s3_url) = self.s3_url { @@ -114,18 +103,13 @@ pub struct ContractSignRequest { pub struct Indexer { latest_block_height: Arc>, last_updated_timestamp: Arc>, + latest_block_timestamp_nanoseconds: Arc>>, running_threshold: Duration, behind_threshold: Duration, - block_height_lag_threshold: u64, - rpc_client: near_fetch::Client, } impl Indexer { - fn new( - latest_block_height: LatestBlockHeight, - options: &Options, - rpc_client: near_fetch::Client, - ) -> Self { + fn new(latest_block_height: LatestBlockHeight, options: &Options) -> Self { tracing::info!( "creating new indexer, latest block height: {}", latest_block_height.block_height @@ -133,10 +117,9 @@ impl Indexer { Self { latest_block_height: Arc::new(RwLock::new(latest_block_height)), last_updated_timestamp: Arc::new(RwLock::new(Instant::now())), + latest_block_timestamp_nanoseconds: Arc::new(RwLock::new(None)), running_threshold: Duration::from_secs(options.running_threshold), behind_threshold: Duration::from_secs(options.behind_threshold), - block_height_lag_threshold: options.block_height_lag_threshold, - rpc_client, } } @@ -145,11 +128,6 @@ impl Indexer { self.latest_block_height.read().await.block_height } - /// Check whether the indexer block height has been updated recently. - pub async fn is_on_track(&self) -> bool { - self.last_updated_timestamp.read().await.elapsed() <= self.behind_threshold - } - /// Check whether the indexer is on track with the latest block height from the chain. pub async fn is_running(&self) -> bool { self.last_updated_timestamp.read().await.elapsed() <= self.running_threshold @@ -157,26 +135,31 @@ impl Indexer { /// Check whether the indexer is behind with the latest block height from the chain. pub async fn is_behind(&self) -> bool { - let network_latest_height = rpc_client::fetch_latest_block_height(&self.rpc_client).await; - if let Ok(network_latest_height) = network_latest_height { - self.latest_block_height().await - < network_latest_height - self.block_height_lag_threshold + if let Some(latest_block_timestamp_nanoseconds) = + *self.latest_block_timestamp_nanoseconds.read().await + { + crate::util::is_elapsed_longer_than_timeout( + latest_block_timestamp_nanoseconds / 1000000000, + self.behind_threshold.as_millis() as u64, + ) } else { true } } pub async fn is_stable(&self) -> bool { - !self.is_behind().await && self.is_on_track().await + !self.is_behind().await && self.is_running().await } - async fn update_block_height( + async fn update_block_height_and_timestamp( &self, block_height: BlockHeight, + block_timestamp_nanoseconds: u64, gcp: &GcpService, ) -> Result<(), DatastoreStorageError> { - tracing::debug!(block_height, "update_block_height"); + tracing::debug!(block_height, "update_block_height_and_timestamp"); *self.last_updated_timestamp.write().await = Instant::now(); + *self.latest_block_timestamp_nanoseconds.write().await = Some(block_timestamp_nanoseconds); self.latest_block_height .write() .await @@ -280,7 +263,11 @@ async fn handle_block( } ctx.indexer - .update_block_height(block.block_height(), &ctx.gcp_service) + .update_block_height_and_timestamp( + block.block_height(), + block.header().timestamp_nanosec(), + &ctx.gcp_service, + ) .await?; crate::metrics::LATEST_BLOCK_HEIGHT @@ -316,7 +303,6 @@ pub fn run( node_account_id: &AccountId, queue: &Arc>, gcp_service: &crate::gcp::GcpService, - rpc_client: near_fetch::Client, rt: &tokio::runtime::Runtime, ) -> anyhow::Result<(JoinHandle>, Indexer)> { tracing::info!( @@ -341,7 +327,7 @@ pub fn run( } }); - let indexer = Indexer::new(latest_block_height, options, rpc_client); + let indexer = Indexer::new(latest_block_height, options); let context = Context { mpc_contract_id: mpc_contract_id.clone(), node_account_id: node_account_id.clone(), diff --git a/chain-signatures/node/src/rpc_client.rs b/chain-signatures/node/src/rpc_client.rs index b0298261..6917af9b 100644 --- a/chain-signatures/node/src/rpc_client.rs +++ b/chain-signatures/node/src/rpc_client.rs @@ -4,7 +4,6 @@ use crate::protocol::ProtocolState; use near_account_id::AccountId; use near_crypto::InMemorySigner; -use near_primitives::types::BlockHeight; use serde_json::json; pub async fn fetch_mpc_contract_state( @@ -100,19 +99,3 @@ pub async fn vote_reshared( Ok(result) } - -pub async fn fetch_latest_block_height( - rpc_client: &near_fetch::Client, -) -> anyhow::Result { - let latest_block_height: BlockHeight = rpc_client - .view_block() - .await - .map_err(|e| { - tracing::warn!(%e, "failed to fetch latest block"); - e - })? - .header - .height; - tracing::debug!(latest_block_height, "latest block height"); - Ok(latest_block_height) -} diff --git a/integration-tests/chain-signatures/src/containers.rs b/integration-tests/chain-signatures/src/containers.rs index 3ad699ea..b6b69d17 100644 --- a/integration-tests/chain-signatures/src/containers.rs +++ b/integration-tests/chain-signatures/src/containers.rs @@ -99,7 +99,6 @@ impl<'a> Node<'a> { start_block_height: 0, running_threshold: 120, behind_threshold: 120, - block_height_lag_threshold: 2000, }; let args = mpc_node::cli::Cli::Start { diff --git a/integration-tests/chain-signatures/src/local.rs b/integration-tests/chain-signatures/src/local.rs index 436bd7e2..923ccb15 100644 --- a/integration-tests/chain-signatures/src/local.rs +++ b/integration-tests/chain-signatures/src/local.rs @@ -55,7 +55,6 @@ impl Node { start_block_height: 0, running_threshold: 120, behind_threshold: 120, - block_height_lag_threshold: 2000, }; let near_rpc = ctx.lake_indexer.rpc_host_address.clone(); let mpc_contract_id = ctx.mpc_contract.id().clone(); @@ -151,7 +150,6 @@ impl Node { start_block_height: 0, running_threshold: 120, behind_threshold: 120, - block_height_lag_threshold: 2000, }; let cli = mpc_node::cli::Cli::Start { near_rpc: config.near_rpc.clone(), From 6c792ec4de92da954298d5622ff178bd5ce1531b Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Tue, 29 Oct 2024 17:31:50 -0700 Subject: [PATCH 8/8] rename --- chain-signatures/node/src/indexer.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/chain-signatures/node/src/indexer.rs b/chain-signatures/node/src/indexer.rs index b652c0e5..a744b69d 100644 --- a/chain-signatures/node/src/indexer.rs +++ b/chain-signatures/node/src/indexer.rs @@ -103,7 +103,7 @@ pub struct ContractSignRequest { pub struct Indexer { latest_block_height: Arc>, last_updated_timestamp: Arc>, - latest_block_timestamp_nanoseconds: Arc>>, + latest_block_timestamp_nanosec: Arc>>, running_threshold: Duration, behind_threshold: Duration, } @@ -117,7 +117,7 @@ impl Indexer { Self { latest_block_height: Arc::new(RwLock::new(latest_block_height)), last_updated_timestamp: Arc::new(RwLock::new(Instant::now())), - latest_block_timestamp_nanoseconds: Arc::new(RwLock::new(None)), + latest_block_timestamp_nanosec: Arc::new(RwLock::new(None)), running_threshold: Duration::from_secs(options.running_threshold), behind_threshold: Duration::from_secs(options.behind_threshold), } @@ -135,11 +135,11 @@ impl Indexer { /// Check whether the indexer is behind with the latest block height from the chain. pub async fn is_behind(&self) -> bool { - if let Some(latest_block_timestamp_nanoseconds) = - *self.latest_block_timestamp_nanoseconds.read().await + if let Some(latest_block_timestamp_nanosec) = + *self.latest_block_timestamp_nanosec.read().await { crate::util::is_elapsed_longer_than_timeout( - latest_block_timestamp_nanoseconds / 1000000000, + latest_block_timestamp_nanosec / 1_000_000_000, self.behind_threshold.as_millis() as u64, ) } else { @@ -154,12 +154,12 @@ impl Indexer { async fn update_block_height_and_timestamp( &self, block_height: BlockHeight, - block_timestamp_nanoseconds: u64, + block_timestamp_nanosec: u64, gcp: &GcpService, ) -> Result<(), DatastoreStorageError> { tracing::debug!(block_height, "update_block_height_and_timestamp"); *self.last_updated_timestamp.write().await = Instant::now(); - *self.latest_block_timestamp_nanoseconds.write().await = Some(block_timestamp_nanoseconds); + *self.latest_block_timestamp_nanosec.write().await = Some(block_timestamp_nanosec); self.latest_block_height .write() .await