diff --git a/node/bft/src/helpers/pending.rs b/node/bft/src/helpers/pending.rs index 0397cb64aa..99d77c62cd 100644 --- a/node/bft/src/helpers/pending.rs +++ b/node/bft/src/helpers/pending.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::MAX_FETCH_TIMEOUT_IN_MS; +use crate::max_fetch_timeout_in_ms; use parking_lot::{Mutex, RwLock}; use std::{ @@ -29,8 +29,11 @@ pub const NUM_REDUNDANT_REQUESTS: usize = 2; pub const NUM_REDUNDANT_REQUESTS: usize = 10; /// The maximum number of seconds to wait before expiring a callback. -/// We ensure that we don't truncate `MAX_FETCH_TIMEOUT_IN_MS` when converting to seconds. -const CALLBACK_EXPIRATION_IN_SECS: i64 = (MAX_FETCH_TIMEOUT_IN_MS as i64 + (1000 - 1)) / 1000; +/// The default value is set to the maximum fetch timeout assuming 200 validators. +#[cfg(not(test))] +const CALLBACK_EXPIRATION_IN_MS: u64 = max_fetch_timeout_in_ms(200); +#[cfg(test)] +const CALLBACK_EXPIRATION_IN_MS: u64 = max_fetch_timeout_in_ms(10); #[derive(Debug)] pub struct Pending { @@ -82,7 +85,7 @@ impl Pending { pub fn num_callbacks(&self, item: impl Into) -> usize { let item = item.into(); // Clear the callbacks that have expired. - self.clear_expired_callbacks_for_item(item); + self.clear_expired_callbacks_for_item(item, None); // Return the number of live callbacks. self.callbacks.lock().get(&item).map_or(0, |callbacks| callbacks.len()) } @@ -103,7 +106,7 @@ impl Pending { } // Clear the callbacks that have expired. - self.clear_expired_callbacks_for_item(item); + self.clear_expired_callbacks_for_item(item, None); // Return the result. result @@ -130,8 +133,14 @@ impl Pending { } /// Removes the callbacks for the specified `item` that have expired. - pub fn clear_expired_callbacks_for_item(&self, item: impl Into) { + pub fn clear_expired_callbacks_for_item(&self, item: impl Into, custom_expiration_time_in_ms: Option) { let item = item.into(); + + // Set the custom expiration time in milliseconds. + let expiration_time_in_ms = custom_expiration_time_in_ms.unwrap_or(CALLBACK_EXPIRATION_IN_MS); + // Calculate the expiration time in seconds. + let expiration_time_in_secs = expiration_time_in_ms.div_ceil(1000) as i64; + // Acquire the callbacks lock. let mut callbacks = self.callbacks.lock(); // Clear the callbacks that have expired. @@ -139,7 +148,7 @@ impl Pending { // Fetch the current timestamp. let now = OffsetDateTime::now_utc().unix_timestamp(); // Remove the callbacks that have expired. - callback_values.retain(|(_, timestamp)| now - *timestamp <= CALLBACK_EXPIRATION_IN_SECS); + callback_values.retain(|(_, timestamp)| now - *timestamp <= expiration_time_in_secs); // If there are no more remaining callbacks for the item, remove the item from the pending queue. if callback_values.is_empty() { @@ -150,10 +159,10 @@ impl Pending { } /// Removes the callbacks for all items have that expired. - pub fn clear_expired_callbacks(&self) { + pub fn clear_expired_callbacks(&self, custom_expiration_time_in_ms: Option) { let items = self.pending.read().keys().copied().collect::>(); for item in items.into_iter() { - self.clear_expired_callbacks_for_item(item); + self.clear_expired_callbacks_for_item(item, custom_expiration_time_in_ms); } } } @@ -256,8 +265,8 @@ mod tests { assert!(pending.insert(commitment_1, addr_1, Some(callback_sender_1))); assert!(pending.insert(commitment_1, addr_2, Some(callback_sender_2))); - // Sleep for a few seconds. - thread::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 - 1)); + // Sleep for a few milliseconds. + thread::sleep(Duration::from_millis(CALLBACK_EXPIRATION_IN_MS - 1)); assert!(pending.insert(commitment_1, addr_3, Some(callback_sender_3))); @@ -270,8 +279,8 @@ mod tests { // Ensure that the expired callbacks have been removed. assert_eq!(pending.num_callbacks(commitment_1), 1); - // Wait for ` CALLBACK_EXPIRATION_IN_SECS` seconds. - thread::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64)); + // Wait for ` CALLBACK_EXPIRATION_IN_MS` milliseconds. + thread::sleep(Duration::from_millis(CALLBACK_EXPIRATION_IN_MS)); // Ensure that the expired callbacks have been removed. assert_eq!(pending.num_callbacks(commitment_1), 0); @@ -312,11 +321,11 @@ mod tests { assert_eq!(pending.num_callbacks(commitment_2), 1); assert_eq!(pending.len(), 2); - // Wait for ` CALLBACK_EXPIRATION_IN_SECS + 1` seconds. - thread::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1)); + // Wait for ` CALLBACK_EXPIRATION_IN_MS + 1000` milliseconds. + thread::sleep(Duration::from_millis(CALLBACK_EXPIRATION_IN_MS + 1000)); // Expire the pending callbacks. - pending.clear_expired_callbacks(); + pending.clear_expired_callbacks(None); // Ensure that the items have been expired. assert_eq!(pending.num_callbacks(commitment_1), 0); diff --git a/node/bft/src/lib.rs b/node/bft/src/lib.rs index e466b9d749..2788a20fa5 100644 --- a/node/bft/src/lib.rs +++ b/node/bft/src/lib.rs @@ -48,14 +48,14 @@ pub const MEMORY_POOL_PORT: u16 = 5000; // port /// The maximum number of milliseconds to wait before proposing a batch. pub const MAX_BATCH_DELAY_IN_MS: u64 = 2500; // ms -/// The maximum number of milliseconds to wait before timing out on a fetch. -pub const MAX_FETCH_TIMEOUT_IN_MS: u64 = 3 * MAX_BATCH_DELAY_IN_MS; // ms /// The maximum number of seconds allowed for the leader to send their certificate. pub const MAX_LEADER_CERTIFICATE_DELAY_IN_SECS: i64 = 2 * MAX_BATCH_DELAY_IN_MS as i64 / 1000; // seconds /// The maximum number of seconds before the timestamp is considered expired. pub const MAX_TIMESTAMP_DELTA_IN_SECS: i64 = 10; // seconds /// The maximum number of workers that can be spawned. pub const MAX_WORKERS: u8 = 1; // worker(s) +/// The base number of milliseconds to wait before timing out on a fetch. +pub const BASE_FETCH_TIMEOUT_IN_MS: u64 = 3 * MAX_BATCH_DELAY_IN_MS; // ms /// The frequency at which each primary broadcasts a ping to every other node. /// Note: If this is updated, be sure to update `MAX_BLOCKS_BEHIND` to correspond properly. @@ -73,3 +73,19 @@ macro_rules! spawn_blocking { } }; } + +/// Returns the maximum fetch timeout in ms as a factor of the number of validators. +/// The value is set to `BASE_FETCH_TIMEOUT_IN_MS` + 200ms per validator in the committee +/// with a maximum of 30000ms. +pub const fn max_fetch_timeout_in_ms(num_validators: u64) -> u64 { + const MAX_FETCH_TIMEOUT_IN_MS: u64 = 30000; // 30 seconds + + // Calculate the timeout. + let timeout = BASE_FETCH_TIMEOUT_IN_MS + 200 * num_validators; + + // Bound the timeout to the maximum allowed fetch timeout. + match timeout > MAX_FETCH_TIMEOUT_IN_MS { + true => MAX_FETCH_TIMEOUT_IN_MS, + false => timeout, + } +} diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 66e319011e..ec1ed60539 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -14,9 +14,9 @@ use crate::{ helpers::{fmt_id, BFTSender, Pending, Storage, SyncReceiver, NUM_REDUNDANT_REQUESTS}, + max_fetch_timeout_in_ms, Gateway, Transport, - MAX_FETCH_TIMEOUT_IN_MS, PRIMARY_PING_IN_MS, }; use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event}; @@ -25,7 +25,7 @@ use snarkos_node_sync::{locators::BlockLocators, BlockSync, BlockSyncMode}; use snarkvm::{ console::{network::Network, types::Field}, ledger::{authority::Authority, block::Block, narwhal::BatchCertificate}, - prelude::{cfg_into_iter, cfg_iter}, + prelude::{cfg_into_iter, cfg_iter, committee::Committee}, }; use anyhow::{bail, Result}; @@ -114,11 +114,18 @@ impl Sync { let self_ = self.clone(); self.spawn(async move { loop { + // Calculate the dynamic fetch timeout in milliseconds. + let num_validators = self_ + .ledger + .get_committee_lookback_for_round(self_.storage.current_round()) + .map_or(Committee::::MAX_COMMITTEE_SIZE as u64, |committee| committee.num_members() as u64); + let timeout_in_ms = max_fetch_timeout_in_ms(num_validators); + // Sleep briefly. - tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + tokio::time::sleep(Duration::from_millis(timeout_in_ms)).await; // Remove the expired pending transmission requests. - self_.pending.clear_expired_callbacks(); + self_.pending.clear_expired_callbacks(Some(timeout_in_ms)); } }); @@ -436,8 +443,14 @@ impl Sync { trace!("Skipped sending redundant request for certificate {} to '{peer_ip}'", fmt_id(certificate_id)); } } + // Calculate the dynamic fetch timeout in milliseconds. + let num_validators = self + .ledger + .get_committee_lookback_for_round(self.storage.current_round()) + .map_or(Committee::::MAX_COMMITTEE_SIZE as u64, |committee| committee.num_members() as u64); + let timeout_in_ms = max_fetch_timeout_in_ms(num_validators); // Wait for the certificate to be fetched. - match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await { + match tokio::time::timeout(Duration::from_millis(timeout_in_ms), callback_receiver).await { // If the certificate was fetched, return it. Ok(result) => Ok(result?), // If the certificate was not fetched, return an error. diff --git a/node/bft/src/worker.rs b/node/bft/src/worker.rs index 929f64a098..f54e572739 100644 --- a/node/bft/src/worker.rs +++ b/node/bft/src/worker.rs @@ -15,9 +15,9 @@ use crate::{ events::{Event, TransmissionRequest, TransmissionResponse}, helpers::{fmt_id, Pending, Ready, Storage, WorkerReceiver, NUM_REDUNDANT_REQUESTS}, + max_fetch_timeout_in_ms, ProposedBatch, Transport, - MAX_FETCH_TIMEOUT_IN_MS, MAX_WORKERS, }; use snarkos_node_bft_ledger_service::LedgerService; @@ -28,6 +28,7 @@ use snarkvm::{ coinbase::{ProverSolution, PuzzleCommitment}, narwhal::{BatchHeader, Data, Transmission, TransmissionID}, }, + prelude::committee::Committee, }; use indexmap::{IndexMap, IndexSet}; @@ -354,11 +355,18 @@ impl Worker { let self_ = self.clone(); self.spawn(async move { loop { + // Calculate the dynamic fetch timeout in milliseconds. + let num_validators = self_ + .ledger + .get_committee_lookback_for_round(self_.storage.current_round()) + .map_or(Committee::::MAX_COMMITTEE_SIZE as u64, |committee| committee.num_members() as u64); + let timeout_in_ms = max_fetch_timeout_in_ms(num_validators); + // Sleep briefly. - tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + tokio::time::sleep(Duration::from_millis(timeout_in_ms)).await; // Remove the expired pending certificate requests. - self_.pending.clear_expired_callbacks(); + self_.pending.clear_expired_callbacks(Some(timeout_in_ms)); } }); @@ -409,8 +417,14 @@ impl Worker { } else { trace!("Skipped sending redundant request for transmission {} to '{peer_ip}'", fmt_id(transmission_id)); } + // Calculate the dynamic fetch timeout in milliseconds. + let num_validators = self + .ledger + .get_committee_lookback_for_round(self.storage.current_round()) + .map_or(Committee::::MAX_COMMITTEE_SIZE as u64, |committee| committee.num_members() as u64); + let timeout_in_ms = max_fetch_timeout_in_ms(num_validators); // Wait for the transmission to be fetched. - match timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await { + match timeout(Duration::from_millis(timeout_in_ms), callback_receiver).await { // If the transmission was fetched, return it. Ok(result) => Ok((transmission_id, result?)), // If the transmission was not fetched, return an error. @@ -544,10 +558,12 @@ mod tests { let rng = &mut TestRng::default(); // Sample a committee. let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng); + let committee_clone = committee.clone(); // Setup the mock gateway and ledger. let gateway = MockGateway::default(); let mut mock_ledger = MockLedger::default(); mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); + mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); mock_ledger.expect_contains_transmission().returning(|_| Ok(false)); mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(())); let ledger: Arc> = Arc::new(mock_ledger); @@ -577,6 +593,7 @@ mod tests { let rng = &mut TestRng::default(); // Sample a committee. let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng); + let committee_clone = committee.clone(); // Setup the mock gateway and ledger. let mut gateway = MockGateway::default(); gateway.expect_send().returning(|_, _| { @@ -585,6 +602,7 @@ mod tests { }); let mut mock_ledger = MockLedger::default(); mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); + mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); mock_ledger.expect_ensure_transmission_id_matches().returning(|_, _| Ok(())); let ledger: Arc> = Arc::new(mock_ledger); // Initialize the storage. @@ -612,6 +630,7 @@ mod tests { let rng = &mut TestRng::default(); // Sample a committee. let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng); + let committee_clone = committee.clone(); // Setup the mock gateway and ledger. let mut gateway = MockGateway::default(); gateway.expect_send().returning(|_, _| { @@ -620,6 +639,7 @@ mod tests { }); let mut mock_ledger = MockLedger::default(); mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); + mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); mock_ledger.expect_contains_transmission().returning(|_| Ok(false)); mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(())); let ledger: Arc> = Arc::new(mock_ledger); @@ -650,6 +670,7 @@ mod tests { let rng = &mut TestRng::default(); // Sample a committee. let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng); + let committee_clone = committee.clone(); // Setup the mock gateway and ledger. let mut gateway = MockGateway::default(); gateway.expect_send().returning(|_, _| { @@ -658,6 +679,7 @@ mod tests { }); let mut mock_ledger = MockLedger::default(); mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); + mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); mock_ledger.expect_contains_transmission().returning(|_| Ok(false)); mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!(""))); let ledger: Arc> = Arc::new(mock_ledger); @@ -688,6 +710,7 @@ mod tests { let mut rng = &mut TestRng::default(); // Sample a committee. let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng); + let committee_clone = committee.clone(); // Setup the mock gateway and ledger. let mut gateway = MockGateway::default(); gateway.expect_send().returning(|_, _| { @@ -696,6 +719,7 @@ mod tests { }); let mut mock_ledger = MockLedger::default(); mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); + mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); mock_ledger.expect_contains_transmission().returning(|_| Ok(false)); mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(())); let ledger: Arc> = Arc::new(mock_ledger); @@ -726,6 +750,7 @@ mod tests { let mut rng = &mut TestRng::default(); // Sample a committee. let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng); + let committee_clone = committee.clone(); // Setup the mock gateway and ledger. let mut gateway = MockGateway::default(); gateway.expect_send().returning(|_, _| { @@ -734,6 +759,7 @@ mod tests { }); let mut mock_ledger = MockLedger::default(); mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); + mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); mock_ledger.expect_contains_transmission().returning(|_| Ok(false)); mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!(""))); let ledger: Arc> = Arc::new(mock_ledger); diff --git a/node/bft/tests/bft_e2e.rs b/node/bft/tests/bft_e2e.rs index cd2a442f52..e6be0edb0c 100644 --- a/node/bft/tests/bft_e2e.rs +++ b/node/bft/tests/bft_e2e.rs @@ -18,7 +18,7 @@ mod common; use crate::common::primary::{TestNetwork, TestNetworkConfig}; use deadline::deadline; use itertools::Itertools; -use snarkos_node_bft::MAX_FETCH_TIMEOUT_IN_MS; +use snarkos_node_bft::BASE_FETCH_TIMEOUT_IN_MS; use std::time::Duration; use tokio::time::sleep; @@ -114,7 +114,7 @@ async fn test_quorum_threshold() { // Start the cannons for node 0. network.fire_transmissions_at(0, TRANSMISSION_INTERVAL_MS); - sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + sleep(Duration::from_millis(BASE_FETCH_TIMEOUT_IN_MS)).await; // Check each node is still at round 1. for validator in network.validators.values() { @@ -125,7 +125,7 @@ async fn test_quorum_threshold() { network.connect_validators(0, 1).await; network.fire_transmissions_at(1, TRANSMISSION_INTERVAL_MS); - sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + sleep(Duration::from_millis(BASE_FETCH_TIMEOUT_IN_MS)).await; // Check each node is still at round 1. for validator in network.validators.values() { diff --git a/node/bft/tests/narwhal_e2e.rs b/node/bft/tests/narwhal_e2e.rs index ebf4c47eb6..256167f355 100644 --- a/node/bft/tests/narwhal_e2e.rs +++ b/node/bft/tests/narwhal_e2e.rs @@ -16,7 +16,7 @@ mod common; use crate::common::primary::{TestNetwork, TestNetworkConfig}; -use snarkos_node_bft::MAX_FETCH_TIMEOUT_IN_MS; +use snarkos_node_bft::BASE_FETCH_TIMEOUT_IN_MS; use std::time::Duration; @@ -72,7 +72,7 @@ async fn test_quorum_threshold() { // Start the cannons for node 0. network.fire_transmissions_at(0, TRANSMISSION_INTERVAL_MS); - sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + sleep(Duration::from_millis(BASE_FETCH_TIMEOUT_IN_MS)).await; // Check each node is still at round 1. for validator in network.validators.values() { @@ -83,7 +83,7 @@ async fn test_quorum_threshold() { network.connect_validators(0, 1).await; network.fire_transmissions_at(1, TRANSMISSION_INTERVAL_MS); - sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + sleep(Duration::from_millis(BASE_FETCH_TIMEOUT_IN_MS)).await; // Check each node is still at round 1. for validator in network.validators.values() {