Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fetch timeouts as a factor of committee size #3139

Draft
wants to merge 4 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions node/bft/src/helpers/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::MAX_FETCH_TIMEOUT_IN_MS;
use crate::max_fetch_timeout_in_ms;

use parking_lot::{Mutex, RwLock};
use std::{
Expand All @@ -29,8 +29,11 @@ pub const NUM_REDUNDANT_REQUESTS: usize = 2;
pub const NUM_REDUNDANT_REQUESTS: usize = 10;

/// The maximum number of seconds to wait before expiring a callback.
/// We ensure that we don't truncate `MAX_FETCH_TIMEOUT_IN_MS` when converting to seconds.
const CALLBACK_EXPIRATION_IN_SECS: i64 = (MAX_FETCH_TIMEOUT_IN_MS as i64 + (1000 - 1)) / 1000;
/// The default value is set to the maximum fetch timeout assuming 200 validators.
#[cfg(not(test))]
const CALLBACK_EXPIRATION_IN_MS: u64 = max_fetch_timeout_in_ms(200);
#[cfg(test)]
const CALLBACK_EXPIRATION_IN_MS: u64 = max_fetch_timeout_in_ms(10);

#[derive(Debug)]
pub struct Pending<T: PartialEq + Eq + Hash, V: Clone> {
Expand Down Expand Up @@ -82,7 +85,7 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
pub fn num_callbacks(&self, item: impl Into<T>) -> usize {
let item = item.into();
// Clear the callbacks that have expired.
self.clear_expired_callbacks_for_item(item);
self.clear_expired_callbacks_for_item(item, None);
// Return the number of live callbacks.
self.callbacks.lock().get(&item).map_or(0, |callbacks| callbacks.len())
}
Expand All @@ -103,7 +106,7 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
}

// Clear the callbacks that have expired.
self.clear_expired_callbacks_for_item(item);
self.clear_expired_callbacks_for_item(item, None);

// Return the result.
result
Expand All @@ -130,16 +133,22 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
}

/// Removes the callbacks for the specified `item` that have expired.
pub fn clear_expired_callbacks_for_item(&self, item: impl Into<T>) {
pub fn clear_expired_callbacks_for_item(&self, item: impl Into<T>, custom_expiration_time_in_ms: Option<u64>) {
let item = item.into();

// Set the custom expiration time in milliseconds.
let expiration_time_in_ms = custom_expiration_time_in_ms.unwrap_or(CALLBACK_EXPIRATION_IN_MS);
// Calculate the expiration time in seconds.
let expiration_time_in_secs = expiration_time_in_ms.div_ceil(1000) as i64;

// Acquire the callbacks lock.
let mut callbacks = self.callbacks.lock();
// Clear the callbacks that have expired.
if let Some(callback_values) = callbacks.get_mut(&item) {
// Fetch the current timestamp.
let now = OffsetDateTime::now_utc().unix_timestamp();
// Remove the callbacks that have expired.
callback_values.retain(|(_, timestamp)| now - *timestamp <= CALLBACK_EXPIRATION_IN_SECS);
callback_values.retain(|(_, timestamp)| now - *timestamp <= expiration_time_in_secs);

// If there are no more remaining callbacks for the item, remove the item from the pending queue.
if callback_values.is_empty() {
Expand All @@ -150,10 +159,10 @@ impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
}

/// Removes the callbacks for all items have that expired.
pub fn clear_expired_callbacks(&self) {
pub fn clear_expired_callbacks(&self, custom_expiration_time_in_ms: Option<u64>) {
let items = self.pending.read().keys().copied().collect::<Vec<T>>();
for item in items.into_iter() {
self.clear_expired_callbacks_for_item(item);
self.clear_expired_callbacks_for_item(item, custom_expiration_time_in_ms);
}
}
}
Expand Down Expand Up @@ -256,8 +265,8 @@ mod tests {
assert!(pending.insert(commitment_1, addr_1, Some(callback_sender_1)));
assert!(pending.insert(commitment_1, addr_2, Some(callback_sender_2)));

// Sleep for a few seconds.
thread::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 - 1));
// Sleep for a few milliseconds.
thread::sleep(Duration::from_millis(CALLBACK_EXPIRATION_IN_MS - 1));

assert!(pending.insert(commitment_1, addr_3, Some(callback_sender_3)));

Expand All @@ -270,8 +279,8 @@ mod tests {
// Ensure that the expired callbacks have been removed.
assert_eq!(pending.num_callbacks(commitment_1), 1);

// Wait for ` CALLBACK_EXPIRATION_IN_SECS` seconds.
thread::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64));
// Wait for ` CALLBACK_EXPIRATION_IN_MS` milliseconds.
thread::sleep(Duration::from_millis(CALLBACK_EXPIRATION_IN_MS));

// Ensure that the expired callbacks have been removed.
assert_eq!(pending.num_callbacks(commitment_1), 0);
Expand Down Expand Up @@ -312,11 +321,11 @@ mod tests {
assert_eq!(pending.num_callbacks(commitment_2), 1);
assert_eq!(pending.len(), 2);

// Wait for ` CALLBACK_EXPIRATION_IN_SECS + 1` seconds.
thread::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1));
// Wait for ` CALLBACK_EXPIRATION_IN_MS + 1000` milliseconds.
thread::sleep(Duration::from_millis(CALLBACK_EXPIRATION_IN_MS + 1000));

// Expire the pending callbacks.
pending.clear_expired_callbacks();
pending.clear_expired_callbacks(None);

// Ensure that the items have been expired.
assert_eq!(pending.num_callbacks(commitment_1), 0);
Expand Down
20 changes: 18 additions & 2 deletions node/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ pub const MEMORY_POOL_PORT: u16 = 5000; // port

/// The maximum number of milliseconds to wait before proposing a batch.
pub const MAX_BATCH_DELAY_IN_MS: u64 = 2500; // ms
/// The maximum number of milliseconds to wait before timing out on a fetch.
pub const MAX_FETCH_TIMEOUT_IN_MS: u64 = 3 * MAX_BATCH_DELAY_IN_MS; // ms
/// The maximum number of seconds allowed for the leader to send their certificate.
pub const MAX_LEADER_CERTIFICATE_DELAY_IN_SECS: i64 = 2 * MAX_BATCH_DELAY_IN_MS as i64 / 1000; // seconds
/// The maximum number of seconds before the timestamp is considered expired.
pub const MAX_TIMESTAMP_DELTA_IN_SECS: i64 = 10; // seconds
/// The maximum number of workers that can be spawned.
pub const MAX_WORKERS: u8 = 1; // worker(s)
/// The base number of milliseconds to wait before timing out on a fetch.
pub const BASE_FETCH_TIMEOUT_IN_MS: u64 = 3 * MAX_BATCH_DELAY_IN_MS; // ms

/// The frequency at which each primary broadcasts a ping to every other node.
/// Note: If this is updated, be sure to update `MAX_BLOCKS_BEHIND` to correspond properly.
Expand All @@ -73,3 +73,19 @@ macro_rules! spawn_blocking {
}
};
}

/// Returns the maximum fetch timeout in ms as a factor of the number of validators.
/// The value is set to `BASE_FETCH_TIMEOUT_IN_MS` + 200ms per validator in the committee
/// with a maximum of 30000ms.
pub const fn max_fetch_timeout_in_ms(num_validators: u64) -> u64 {
const MAX_FETCH_TIMEOUT_IN_MS: u64 = 30000; // 30 seconds

// Calculate the timeout.
let timeout = BASE_FETCH_TIMEOUT_IN_MS + 200 * num_validators;

// Bound the timeout to the maximum allowed fetch timeout.
match timeout > MAX_FETCH_TIMEOUT_IN_MS {
true => MAX_FETCH_TIMEOUT_IN_MS,
false => timeout,
}
ljedrz marked this conversation as resolved.
Show resolved Hide resolved
}
23 changes: 18 additions & 5 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

use crate::{
helpers::{fmt_id, BFTSender, Pending, Storage, SyncReceiver, NUM_REDUNDANT_REQUESTS},
max_fetch_timeout_in_ms,
Gateway,
Transport,
MAX_FETCH_TIMEOUT_IN_MS,
PRIMARY_PING_IN_MS,
};
use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
Expand All @@ -25,7 +25,7 @@ use snarkos_node_sync::{locators::BlockLocators, BlockSync, BlockSyncMode};
use snarkvm::{
console::{network::Network, types::Field},
ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
prelude::{cfg_into_iter, cfg_iter},
prelude::{cfg_into_iter, cfg_iter, committee::Committee},
};

use anyhow::{bail, Result};
Expand Down Expand Up @@ -114,11 +114,18 @@ impl<N: Network> Sync<N> {
let self_ = self.clone();
self.spawn(async move {
loop {
// Calculate the dynamic fetch timeout in milliseconds.
let num_validators = self_
.ledger
.get_committee_lookback_for_round(self_.storage.current_round())
.map_or(Committee::<N>::MAX_COMMITTEE_SIZE as u64, |committee| committee.num_members() as u64);
let timeout_in_ms = max_fetch_timeout_in_ms(num_validators);

// Sleep briefly.
tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
tokio::time::sleep(Duration::from_millis(timeout_in_ms)).await;

// Remove the expired pending transmission requests.
self_.pending.clear_expired_callbacks();
self_.pending.clear_expired_callbacks(Some(timeout_in_ms));
}
});

Expand Down Expand Up @@ -436,8 +443,14 @@ impl<N: Network> Sync<N> {
trace!("Skipped sending redundant request for certificate {} to '{peer_ip}'", fmt_id(certificate_id));
}
}
// Calculate the dynamic fetch timeout in milliseconds.
let num_validators = self
.ledger
.get_committee_lookback_for_round(self.storage.current_round())
.map_or(Committee::<N>::MAX_COMMITTEE_SIZE as u64, |committee| committee.num_members() as u64);
let timeout_in_ms = max_fetch_timeout_in_ms(num_validators);
Comment on lines +447 to +451
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is the next time this is calculated, maybe a dedicated method would be a good idea?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I can also see this done in the worker module)

// Wait for the certificate to be fetched.
match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
match tokio::time::timeout(Duration::from_millis(timeout_in_ms), callback_receiver).await {
// If the certificate was fetched, return it.
Ok(result) => Ok(result?),
// If the certificate was not fetched, return an error.
Expand Down
34 changes: 30 additions & 4 deletions node/bft/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
use crate::{
events::{Event, TransmissionRequest, TransmissionResponse},
helpers::{fmt_id, Pending, Ready, Storage, WorkerReceiver, NUM_REDUNDANT_REQUESTS},
max_fetch_timeout_in_ms,
ProposedBatch,
Transport,
MAX_FETCH_TIMEOUT_IN_MS,
MAX_WORKERS,
};
use snarkos_node_bft_ledger_service::LedgerService;
Expand All @@ -28,6 +28,7 @@ use snarkvm::{
coinbase::{ProverSolution, PuzzleCommitment},
narwhal::{BatchHeader, Data, Transmission, TransmissionID},
},
prelude::committee::Committee,
};

use indexmap::{IndexMap, IndexSet};
Expand Down Expand Up @@ -354,11 +355,18 @@ impl<N: Network> Worker<N> {
let self_ = self.clone();
self.spawn(async move {
loop {
// Calculate the dynamic fetch timeout in milliseconds.
let num_validators = self_
.ledger
.get_committee_lookback_for_round(self_.storage.current_round())
.map_or(Committee::<N>::MAX_COMMITTEE_SIZE as u64, |committee| committee.num_members() as u64);
let timeout_in_ms = max_fetch_timeout_in_ms(num_validators);

// Sleep briefly.
tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
tokio::time::sleep(Duration::from_millis(timeout_in_ms)).await;

// Remove the expired pending certificate requests.
self_.pending.clear_expired_callbacks();
self_.pending.clear_expired_callbacks(Some(timeout_in_ms));
}
});

Expand Down Expand Up @@ -409,8 +417,14 @@ impl<N: Network> Worker<N> {
} else {
trace!("Skipped sending redundant request for transmission {} to '{peer_ip}'", fmt_id(transmission_id));
}
// Calculate the dynamic fetch timeout in milliseconds.
let num_validators = self
.ledger
.get_committee_lookback_for_round(self.storage.current_round())
.map_or(Committee::<N>::MAX_COMMITTEE_SIZE as u64, |committee| committee.num_members() as u64);
let timeout_in_ms = max_fetch_timeout_in_ms(num_validators);
// Wait for the transmission to be fetched.
match timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
match timeout(Duration::from_millis(timeout_in_ms), callback_receiver).await {
// If the transmission was fetched, return it.
Ok(result) => Ok((transmission_id, result?)),
// If the transmission was not fetched, return an error.
Expand Down Expand Up @@ -544,10 +558,12 @@ mod tests {
let rng = &mut TestRng::default();
// Sample a committee.
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let committee_clone = committee.clone();
// Setup the mock gateway and ledger.
let gateway = MockGateway::default();
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
Expand Down Expand Up @@ -577,6 +593,7 @@ mod tests {
let rng = &mut TestRng::default();
// Sample a committee.
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let committee_clone = committee.clone();
// Setup the mock gateway and ledger.
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
Expand All @@ -585,6 +602,7 @@ mod tests {
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_ensure_transmission_id_matches().returning(|_, _| Ok(()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
// Initialize the storage.
Expand Down Expand Up @@ -612,6 +630,7 @@ mod tests {
let rng = &mut TestRng::default();
// Sample a committee.
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let committee_clone = committee.clone();
// Setup the mock gateway and ledger.
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
Expand All @@ -620,6 +639,7 @@ mod tests {
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
Expand Down Expand Up @@ -650,6 +670,7 @@ mod tests {
let rng = &mut TestRng::default();
// Sample a committee.
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let committee_clone = committee.clone();
// Setup the mock gateway and ledger.
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
Expand All @@ -658,6 +679,7 @@ mod tests {
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!("")));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
Expand Down Expand Up @@ -688,6 +710,7 @@ mod tests {
let mut rng = &mut TestRng::default();
// Sample a committee.
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let committee_clone = committee.clone();
// Setup the mock gateway and ledger.
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
Expand All @@ -696,6 +719,7 @@ mod tests {
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
Expand Down Expand Up @@ -726,6 +750,7 @@ mod tests {
let mut rng = &mut TestRng::default();
// Sample a committee.
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
let committee_clone = committee.clone();
// Setup the mock gateway and ledger.
let mut gateway = MockGateway::default();
gateway.expect_send().returning(|_, _| {
Expand All @@ -734,6 +759,7 @@ mod tests {
});
let mut mock_ledger = MockLedger::default();
mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!("")));
let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
Expand Down
6 changes: 3 additions & 3 deletions node/bft/tests/bft_e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod common;
use crate::common::primary::{TestNetwork, TestNetworkConfig};
use deadline::deadline;
use itertools::Itertools;
use snarkos_node_bft::MAX_FETCH_TIMEOUT_IN_MS;
use snarkos_node_bft::BASE_FETCH_TIMEOUT_IN_MS;
use std::time::Duration;
use tokio::time::sleep;

Expand Down Expand Up @@ -114,7 +114,7 @@ async fn test_quorum_threshold() {
// Start the cannons for node 0.
network.fire_transmissions_at(0, TRANSMISSION_INTERVAL_MS);

sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
sleep(Duration::from_millis(BASE_FETCH_TIMEOUT_IN_MS)).await;

// Check each node is still at round 1.
for validator in network.validators.values() {
Expand All @@ -125,7 +125,7 @@ async fn test_quorum_threshold() {
network.connect_validators(0, 1).await;
network.fire_transmissions_at(1, TRANSMISSION_INTERVAL_MS);

sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
sleep(Duration::from_millis(BASE_FETCH_TIMEOUT_IN_MS)).await;

// Check each node is still at round 1.
for validator in network.validators.values() {
Expand Down
Loading