From 94f7b6e603a144928bef7d960c495e0f31c29d70 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Thu, 17 Apr 2025 22:34:58 +0000 Subject: [PATCH 1/4] Trust quorum: reconfiguration and commit behavior This PR adds further functionality to the sans-io trust quorum protocol. Configurations can now be committed via `Node::commit_reconfiguration`. For each reconfiguration attempt made on top of a committed configuration, the rack secret for the last committed reconfiguration will be reconstructed after retreiving a threshold of shares from members of that configuration. At this point this "old" rack secret will be encrypted with a key derived from the rack secret for the current configuration being coordinated and included as necessary in prepare messages sent out during coordination. The property based test for coordinator behavior has been expanded to include support for this functionality, as well as to allow dropping messages between nodes if such an action is generated. The bulk of this PR lies in the test code, and it has been restructured to handle multiple reconfigurations and commits. This has led to the tracking of shares across non-existent test nodes, and enhancements to the model. Additionally, a small change was made to copy some of the errors out of `validators.rs` and into their own file. --- trust-quorum/src/coordinator_state.rs | 386 +++++++++++++++- trust-quorum/src/crypto.rs | 74 ++- trust-quorum/src/errors.rs | 103 +++++ trust-quorum/src/lib.rs | 8 + trust-quorum/src/node.rs | 135 +++++- trust-quorum/src/persistent_state.rs | 12 +- trust-quorum/src/validators.rs | 82 +--- trust-quorum/tests/coordinator.rs | 627 +++++++++++++++++++++----- 8 files changed, 1200 insertions(+), 227 deletions(-) create mode 100644 trust-quorum/src/errors.rs diff --git a/trust-quorum/src/coordinator_state.rs b/trust-quorum/src/coordinator_state.rs index b810216a085..3100ae5bd77 100644 --- a/trust-quorum/src/coordinator_state.rs +++ b/trust-quorum/src/coordinator_state.rs @@ -4,13 +4,18 @@ //! State of a reconfiguration coordinator inside a [`crate::Node`] -use crate::crypto::{LrtqShare, Sha3_256Digest, ShareDigestLrtq}; +use crate::configuration::PreviousConfiguration; +use crate::crypto::{self, LrtqShare, Sha3_256Digest, ShareDigestLrtq}; +use crate::errors::ReconfigurationError; use crate::messages::{PeerMsg, PrepareMsg}; -use crate::validators::{ReconfigurationError, ValidatedReconfigureMsg}; -use crate::{Configuration, Envelope, Epoch, PlatformId}; +use crate::validators::ValidatedReconfigureMsg; +use crate::{ + Configuration, Envelope, Epoch, PlatformId, RackSecret, Threshold, +}; use gfss::shamir::Share; -use slog::{Logger, o, warn}; +use slog::{Logger, error, info, o, warn}; use std::collections::{BTreeMap, BTreeSet}; +use std::mem; use std::time::Instant; /// The state of a reconfiguration coordinator. @@ -77,9 +82,16 @@ impl CoordinatorState { } let op = CoordinatorOperation::Prepare { prepares, - prepare_acks: BTreeSet::new(), + // Always include ourself + prepare_acks: BTreeSet::from([msg.coordinator_id().clone()]), }; + info!( + log, + "Starting coordination on uninitialized node"; + "epoch" => %config.epoch + ); + let state = CoordinatorState::new(log, now, msg, config, op); // Safety: Construction of a `ValidatedReconfigureMsg` ensures that @@ -94,15 +106,28 @@ impl CoordinatorState { now: Instant, msg: ValidatedReconfigureMsg, last_committed_config: &Configuration, + our_last_committed_share: Share, ) -> Result { let (config, new_shares) = Configuration::new(&msg)?; + info!( + log, + "Starting coordination on existing node"; + "epoch" => %config.epoch, + "last_committed_epoch" => %last_committed_config.epoch + ); + // We must collect shares from the last configuration // so we can recompute the old rack secret. let op = CoordinatorOperation::CollectShares { - epoch: last_committed_config.epoch, - members: last_committed_config.members.clone(), - collected_shares: BTreeMap::new(), + last_committed_epoch: last_committed_config.epoch, + last_committed_members: last_committed_config.members.clone(), + last_committed_threshold: last_committed_config.threshold, + // Always include ourself + collected_shares: BTreeMap::from([( + msg.coordinator_id().clone(), + our_last_committed_share, + )]), new_shares, }; @@ -146,7 +171,6 @@ impl CoordinatorState { // will return a copy of it. // // This method is "in progress" - allow unused parameters for now - #[expect(unused)] pub fn send_msgs(&mut self, now: Instant, outbox: &mut Vec) { if now < self.retry_deadline { return; @@ -154,13 +178,31 @@ impl CoordinatorState { self.retry_deadline = now + self.reconfigure_msg.retry_timeout(); match &self.op { CoordinatorOperation::CollectShares { - epoch, - members, + last_committed_epoch, + last_committed_members, collected_shares, .. - } => {} - CoordinatorOperation::CollectLrtqShares { members, shares } => {} - CoordinatorOperation::Prepare { prepares, prepare_acks } => { + } => { + // Send to all members that we haven't yet collected shares from. + // Also exclude ourself. + for member in last_committed_members + .keys() + .filter(|&m| { + m != self.reconfigure_msg.coordinator_id() + && !collected_shares.contains_key(m) + }) + .cloned() + { + outbox.push(Envelope { + to: member, + from: self.reconfigure_msg.coordinator_id().clone(), + msg: PeerMsg::GetShare(*last_committed_epoch), + }); + } + } + CoordinatorOperation::CollectLrtqShares { .. } => {} + CoordinatorOperation::Prepare { prepares, .. } => { + // prepares already filter ourself for (platform_id, prepare) in prepares.clone().into_iter() { outbox.push(Envelope { to: platform_id, @@ -205,24 +247,300 @@ impl CoordinatorState { } } } + + /// Handle a share response for the last_committed epoch. + /// + /// If we transition from collecting shares to sending prepare messages, + /// we also return our own `PrepareMsg` that must be saved as part of the + /// persistent state. + pub fn handle_share( + &mut self, + now: Instant, + outbox: &mut Vec, + from: PlatformId, + epoch: Epoch, + share: Share, + ) -> Option { + match &mut self.op { + CoordinatorOperation::CollectShares { + last_committed_epoch, + last_committed_members, + last_committed_threshold, + collected_shares, + new_shares, + } => { + // First, perform some validation on the incoming share + if *last_committed_epoch != epoch { + warn!( + self.log, + "Received Share from node with wrong epoch"; + "epoch" => %epoch, + "from" => %from + ); + return None; + } + + let Some(expected_digest) = last_committed_members.get(&from) + else { + warn!( + self.log, + "Received Share from unexpected node"; + "epoch" => %epoch, + "from" => %from + ); + return None; + }; + + let mut digest = Sha3_256Digest::default(); + share.digest::(&mut digest.0); + if digest != *expected_digest { + error!( + self.log, + "Received share with invalid digest"; + "epoch" => %epoch, + "from" => %from + ); + } + + // A valid share was received. Is it new? + if collected_shares.insert(from, share).is_some() { + return None; + } + // + // Do we have enough shares to recompute the rack secret + // for `epoch`? + if collected_shares.len() < last_committed_threshold.0 as usize + { + return None; + } + + // Reconstruct the old rack secret from the shares we collected. + let old_rack_secret = match RackSecret::reconstruct_from_iter( + collected_shares.values(), + ) { + Ok(old_rack_secret) => { + info!( + self.log, + "Successfully reconstructed old rack secret"; + "last_committed_epoch" => %epoch, + "epoch" => %self.configuration.epoch + ); + + old_rack_secret + } + Err(err) => { + error!( + self.log, + "Failed to reconstruct old rack secret: {err}"; + "epoch" => %epoch + ); + return None; + } + }; + + // Reconstruct the new rack secret from the new shares. + let new_rack_secret = match RackSecret::reconstruct_from_iter( + new_shares.values(), + ) { + Ok(new_rack_secret) => { + info!( + self.log, + "Successfully reconstructed new rack secret"; + "last_committed_epoch" => %epoch, + "epoch" => %self.configuration.epoch + ); + new_rack_secret + } + Err(err) => { + error!( + self.log, + "Failed to reconstruct new rack secret: {err}"; + "epoch" => %epoch + ); + return None; + } + }; + + // Encrypt our old secret with a key derived from the new secret + let (encrypted_last_committed_rack_secret, salt) = + match crypto::encrypt_old_rack_secret( + old_rack_secret, + new_rack_secret, + self.configuration.rack_id, + *last_committed_epoch, + self.configuration.epoch, + ) { + Ok(val) => val, + Err(_) => { + error!( + self.log, "Failed to encrypt old rack secret"; + "last_committed_epoch" => %epoch, + "epoch" => %self.configuration.epoch + ); + return None; + } + }; + + // Create and set our previous configuration + assert!(self.configuration.previous_configuration.is_none()); + let previous_config = PreviousConfiguration { + epoch: *last_committed_epoch, + is_lrtq: false, + encrypted_last_committed_rack_secret, + encrypted_last_committed_rack_secret_salt: salt, + }; + self.configuration.previous_configuration = + Some(previous_config); + + // Transition to sending `PrepareMsg`s for this configuration + let my_prepare_msg = + self.start_preparing_after_collecting_shares(now, outbox); + Some(my_prepare_msg) + } + op => { + warn!( + self.log, + "Share received when coordinator is not expecting it"; + "op" => op.name(), + "from" => %from + ); + None + } + } + } + + pub fn coordinator_status(&mut self) -> CoordinatorStatus { + (&self.op).into() + } + + // Transition from `CoordinationOperation::CollectShares` + // or `CoordinationOperation::CollectLrtqShares` to + // `CoordinationOperation::Prepare`. + // + // Return our own prepare message so it can be persisted. + // + // Panics if the current op is already `CoordinationOperation::Prepare`. + fn start_preparing_after_collecting_shares( + &mut self, + now: Instant, + outbox: &mut Vec, + ) -> PrepareMsg { + // Get the set of members in both the old and new group along with the + // shares mapped to all members in the new group. + let (existing_members, new_shares) = match &mut self.op { + CoordinatorOperation::CollectShares { + last_committed_members, + new_shares, + .. + } => { + let existing_members: BTreeSet<_> = + mem::take(last_committed_members).into_keys().collect(); + (existing_members, mem::take(new_shares)) + } + CoordinatorOperation::CollectLrtqShares { + last_committed_members, + new_shares, + .. + } => { + let existing_members: BTreeSet<_> = + mem::take(last_committed_members).into_keys().collect(); + (existing_members, mem::take(new_shares)) + } + CoordinatorOperation::Prepare { .. } => { + error!( + self.log, + "logic error: already preparing"; + "epoch" => %self.configuration.epoch, + ); + panic!( + "logic error: already preparing: epoch = {}", + self.configuration.epoch + ); + } + }; + + // Build up our set of `PrepareMsgs` + // + // `my_prepare_msg` is optional only so that we can fill it in via + // the loop. It will always become `Some`, as a `Configuration` always + // contains the coordinator as a member as validated by construction of + // `ValidatedReconfigureMsg`. + let mut my_prepare_msg: Option = None; + let mut prepares = BTreeMap::new(); + for (member, share) in new_shares { + if existing_members.contains(&member) { + let prepare_msg = + PrepareMsg { config: self.configuration.clone(), share }; + if *self.reconfigure_msg.coordinator_id() == member { + my_prepare_msg = Some(prepare_msg); + } else { + prepares.insert(member, prepare_msg); + } + } else { + // New members do not get sent information about the previous + // configuration. + let mut config = self.configuration.clone(); + config.previous_configuration = None; + let prepare_msg = PrepareMsg { config, share }; + prepares.insert(member, prepare_msg); + } + } + + // Actually transition to the new operation + self.op = CoordinatorOperation::Prepare { + prepares, + // Always include ourself + prepare_acks: BTreeSet::from([self + .reconfigure_msg + .coordinator_id() + .clone()]), + }; + + let last_committed_epoch = + self.configuration.previous_configuration.as_ref().unwrap().epoch; + info!( + self.log, + "Starting to prepare after collecting shares"; + "epoch" => %self.configuration.epoch, + // Safety: This whole method relies on having a previous configuration + "last_committed_epoch" => %last_committed_epoch + ); + + // Trigger sending of Prepare messages immediately + self.retry_deadline = now; + self.send_msgs(now, outbox); + + // Return our own `PrepareMsg` for persistence + // Safety: Construction of a `ValidatedReconfigureMsg` ensures that + // `my_platform_id` is part of the new configuration and has a share. + // We can therefore safely unwrap here. + my_prepare_msg.unwrap() + } } /// What should the coordinator be doing? pub enum CoordinatorOperation { // We haven't started implementing this yet - #[expect(unused)] CollectShares { - epoch: Epoch, - members: BTreeMap, + last_committed_epoch: Epoch, + last_committed_members: BTreeMap, + last_committed_threshold: Threshold, + + // Shares collected from `last_committed_members` collected_shares: BTreeMap, + + // New shares to be used when we get to the `Prepare` operation new_shares: BTreeMap, }, // We haven't started implementing this yet // Epoch is always 0 - #[allow(unused)] CollectLrtqShares { - members: BTreeMap, - shares: BTreeMap, + last_committed_members: BTreeMap, + last_committed_threshold: Threshold, + collected_shares: BTreeMap, + + // New shares to be used when we get to the `Prepare` operation + new_shares: BTreeMap, }, Prepare { /// The set of Prepares to send to each node @@ -244,3 +562,31 @@ impl CoordinatorOperation { } } } + +/// A summary of the coordinator's current operational status +pub enum CoordinatorStatus { + CollectShares { collected_from: BTreeSet }, + CollectLrtqShares { collected_from: BTreeSet }, + Prepare { acked: BTreeSet }, +} + +impl From<&CoordinatorOperation> for CoordinatorStatus { + fn from(value: &CoordinatorOperation) -> Self { + match value { + CoordinatorOperation::CollectShares { + collected_shares, .. + } => CoordinatorStatus::CollectShares { + collected_from: collected_shares.keys().cloned().collect(), + }, + CoordinatorOperation::CollectLrtqShares { + collected_shares, + .. + } => CoordinatorStatus::CollectLrtqShares { + collected_from: collected_shares.keys().cloned().collect(), + }, + CoordinatorOperation::Prepare { prepare_acks, .. } => { + CoordinatorStatus::Prepare { acked: prepare_acks.clone() } + } + } + } +} diff --git a/trust-quorum/src/crypto.rs b/trust-quorum/src/crypto.rs index e5bb53cec59..c53ba8021a6 100644 --- a/trust-quorum/src/crypto.rs +++ b/trust-quorum/src/crypto.rs @@ -4,9 +4,13 @@ //! Various cryptographic constructs used by trust quroum. +use crate::{Epoch, Threshold}; use bootstore::trust_quorum::RackSecret as LrtqRackSecret; +use chacha20poly1305::{ChaCha20Poly1305, Key, KeyInit, aead, aead::Aead}; use derive_more::From; use gfss::shamir::{self, CombineError, SecretShares, Share, SplitError}; +use hkdf::Hkdf; +use omicron_uuid_kinds::{GenericUuid, RackUuid}; use rand::RngCore; use rand::rngs::OsRng; use secrecy::{DebugSecret, ExposeSecret, Secret}; @@ -15,9 +19,7 @@ use sha3::{Digest, Sha3_256}; use slog_error_chain::SlogInlineError; use std::fmt::Debug; use subtle::ConstantTimeEq; -use zeroize::{Zeroize, ZeroizeOnDrop}; - -use crate::Threshold; +use zeroize::{Zeroize, ZeroizeOnDrop, Zeroizing}; /// Each share contains a byte for the y-coordinate of 32 points on 32 different /// polynomials over Ed25519. All points share an x-coordinate, which is the 0th @@ -203,6 +205,15 @@ impl RackSecret { let secret = shamir::compute_secret(shares)?.try_into()?; Ok(secret) } + + pub fn reconstruct_from_iter<'a>( + shares: impl Iterator, + ) -> Result { + let mut shares: Vec = shares.cloned().collect(); + let res = RackSecret::reconstruct(&shares); + shares.zeroize(); + res + } } impl DebugSecret for RackSecret {} @@ -242,6 +253,63 @@ impl Default for Salt { } } +/// Encrypt the old rack secret with a key derived from the new rack secret. +/// +/// A random salt is generated and returned along with the encrypted secret. Key +/// derivation context includes `rack_id`, `old_epoch`, and `new_epoch`. +pub fn encrypt_old_rack_secret( + old_rack_secret: ReconstructedRackSecret, + new_rack_secret: ReconstructedRackSecret, + rack_id: RackUuid, + old_epoch: Epoch, + new_epoch: Epoch, +) -> aead::Result<(EncryptedRackSecret, Salt)> { + let salt = Salt::new(); + let cipher = derive_encryption_key_for_rack_secret( + new_rack_secret, + salt, + rack_id, + old_epoch, + new_epoch, + ); + + // This key is only used to encrypt one plaintext. A nonce of all zeroes is + // all that's required. + let nonce = [0u8; 12].into(); + let encrypted_rack_secret = EncryptedRackSecret( + cipher.encrypt(&nonce, old_rack_secret.expose_secret().as_ref())?, + ); + + Ok((encrypted_rack_secret, salt)) +} + +fn derive_encryption_key_for_rack_secret( + new_rack_secret: ReconstructedRackSecret, + salt: Salt, + rack_id: RackUuid, + old_epoch: Epoch, + new_epoch: Epoch, +) -> ChaCha20Poly1305 { + let prk = Hkdf::::new( + Some(&salt.0[..]), + new_rack_secret.expose_secret(), + ); + + // The "info" string is context to bind the key to its purpose + let mut key = Zeroizing::new([0u8; 32]); + prk.expand_multi_info( + &[ + b"trust-quorum-v1-rack-secret", + rack_id.as_untyped_uuid().as_ref(), + &new_epoch.0.to_be_bytes(), + &old_epoch.0.to_be_bytes(), + ], + key.as_mut(), + ) + .unwrap(); + ChaCha20Poly1305::new(Key::from_slice(key.as_ref())) +} + #[cfg(test)] mod tests { use super::*; diff --git a/trust-quorum/src/errors.rs b/trust-quorum/src/errors.rs new file mode 100644 index 00000000000..0a30fc81f15 --- /dev/null +++ b/trust-quorum/src/errors.rs @@ -0,0 +1,103 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Various errors for the trust quorum APIs + +use crate::configuration::ConfigurationError; +use crate::{Epoch, PlatformId, Threshold}; +use omicron_uuid_kinds::RackUuid; + +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +pub enum CommitError { + #[error("invalid rack id")] + InvalidRackId( + #[from] + #[source] + MismatchedRackIdError, + ), + + #[error("missing prepare msg")] + MissingPrepare, + + #[error("prepare for a later configuration exists")] + OutOfOrderCommit, +} + +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +#[error( + "sled was decommissioned on msg from {from:?} at epoch {epoch:?}: last prepared epoch = {last_prepared_epoch:?}" +)] +pub struct SledDecommissionedError { + pub from: PlatformId, + pub epoch: Epoch, + pub last_prepared_epoch: Option, +} + +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +#[error("mismatched rack id: expected {expected:?}, got {got:?}")] +pub struct MismatchedRackIdError { + pub expected: RackUuid, + pub got: RackUuid, +} + +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +pub enum ReconfigurationError { + #[error("reconfiguration coordinator must be a member of the new group")] + CoordinatorMustBeAMemberOfNewGroup, + + #[error("upgrade from LRTQ required")] + UpgradeFromLrtqRequired, + + #[error( + "number of members: {num_members:?} must be greater than threshold: {threshold:?}" + )] + ThresholdMismatch { num_members: usize, threshold: Threshold }, + + #[error( + "invalid membership size: {0:?}: must be between 3 and 32 inclusive" + )] + InvalidMembershipSize(usize), + + #[error( + "invalid threshold: {0:?}: threshold must be between 2 and 31 inclusive" + )] + InvalidThreshold(Threshold), + + #[error( + "Node has last committed epoch of {node_epoch:?}, message contains {msg_epoch:?}" + )] + LastCommittedEpochMismatch { + node_epoch: Option, + msg_epoch: Option, + }, + + #[error( + "sled has already prepared a request at epoch {existing:?}, and cannot prepare another at a smaller or equivalent epoch {new:?}" + )] + PreparedEpochMismatch { existing: Epoch, new: Epoch }, + + #[error("invalid rack id in reconfigure msg")] + InvalidRackId( + #[from] + #[source] + MismatchedRackIdError, + ), + + #[error("cannot reconfigure a decommissioned sled")] + DecommissionedSled( + #[from] + #[source] + SledDecommissionedError, + ), + #[error( + "reconfiguration in progress at epoch {current_epoch:?}: cannot reconfigure for older epoch {msg_epoch:?}" + )] + ReconfigurationInProgress { current_epoch: Epoch, msg_epoch: Epoch }, + + #[error("mismatched reconfiguration requests for epoch {0:?}")] + MismatchedReconfigurationForSameEpoch(Epoch), + + #[error(transparent)] + Configuration(#[from] ConfigurationError), +} diff --git a/trust-quorum/src/lib.rs b/trust-quorum/src/lib.rs index b1dfe75d553..d1c72215980 100644 --- a/trust-quorum/src/lib.rs +++ b/trust-quorum/src/lib.rs @@ -15,6 +15,7 @@ use serde::{Deserialize, Serialize}; mod configuration; mod coordinator_state; pub(crate) mod crypto; +pub(crate) mod errors; mod messages; mod node; mod persistent_state; @@ -40,6 +41,13 @@ pub use persistent_state::{PersistentState, PersistentStateSummary}; )] pub struct Epoch(pub u64); +impl Epoch { + // Increment the epoch and return the new value + pub fn inc(&self) -> Epoch { + Epoch(self.0 + 1) + } +} + /// The number of shares required to reconstruct the rack secret /// /// Typically referred to as `k` in the docs diff --git a/trust-quorum/src/node.rs b/trust-quorum/src/node.rs index a65e5e858df..daf60d0f0db 100644 --- a/trust-quorum/src/node.rs +++ b/trust-quorum/src/node.rs @@ -4,12 +4,15 @@ //! A trust quorum node that implements the trust quorum protocol -use crate::validators::{ReconfigurationError, ValidatedReconfigureMsg}; +use crate::errors::{CommitError, MismatchedRackIdError, ReconfigurationError}; +use crate::validators::ValidatedReconfigureMsg; use crate::{ CoordinatorState, Envelope, Epoch, PersistentState, PlatformId, messages::*, }; +use gfss::shamir::Share; +use omicron_uuid_kinds::RackUuid; -use slog::{Logger, o, warn}; +use slog::{Logger, error, info, o, warn}; use std::time::Instant; /// An entity capable of participating in trust quorum @@ -76,6 +79,88 @@ impl Node { Ok(persistent_state) } + /// Commit the configuration for the given epoch + pub fn commit_reconfiguration( + &mut self, + epoch: Epoch, + rack_id: RackUuid, + ) -> Result, CommitError> { + if self.persistent_state.last_committed_epoch() == Some(epoch) { + // Idempotent request + return Ok(None); + } + + // Only commit if we have a `PrepareMsg` and it's the latest `PrepareMsg`. + // + // This forces a global ordering of `PrepareMsg`s, because it's only + // possible to re-derive a key share in a `PrepareMsg` for the current + // configuration. + // + // In practice this check will always succeed if we have the + // `PrepareMsg` for this epoch, because later `Prepare` messages would + // not have been accepted before this commit arrived. This is because + // each `PrepareMsg` contains the `last_committed_epoch` that must have + // been seen in order to be accepted. If this commit hadn't occurred, + // then it wasn't part of the chain of `last_committed_epoch`s, and was + // abandonded/canceled. In that case, if we ended up getting a commit, + // then it would not inductively have been part of the existing chain + // and so would be a bug in the protocol execution. Because of this we + // check and error on this condition. + let last_prepared_epoch = self.persistent_state.last_prepared_epoch(); + if last_prepared_epoch != Some(epoch) { + error!( + self.log, + "Commit message occurred out of order"; + "epoch" => %epoch, + "last_prepared_epoch" => ?last_prepared_epoch + ); + return Err(CommitError::OutOfOrderCommit); + } + if let Some(prepare) = self.persistent_state.prepares.get(&epoch) { + if prepare.config.rack_id != rack_id { + error!( + self.log, + "Commit attempted with invalid rack_id"; + "expected" => %prepare.config.rack_id, + "got" => %rack_id + ); + return Err(CommitError::InvalidRackId( + MismatchedRackIdError { + expected: prepare.config.rack_id, + got: rack_id, + }, + )); + } + } else { + // This is an erroneous commit attempt from nexus. Log it. + // + // Nexus should instead tell this node to retrieve a `Prepare` + // from another node that has already committed. + error!( + self.log, + "tried to commit a configuration, but missing prepare msg"; + "epoch" => %epoch + ); + return Err(CommitError::MissingPrepare); + } + + info!(self.log, "Committed configuration"; "epoch" => %epoch); + + // Are we currently coordinating for this epoch? + // Stop coordinating if we are + if self.coordinator_state.is_some() { + info!( + self.log, + "Stopping coordination due to commit"; + "epoch" => %epoch + ); + self.coordinator_state = None; + } + + self.persistent_state.commits.insert(epoch); + Ok(Some(self.persistent_state.clone())) + } + /// Process a timer tick /// /// Ticks are issued by the caller in order to move the protocol forward. @@ -87,8 +172,8 @@ impl Node { /// Handle a message from another node pub fn handle( &mut self, - _now: Instant, - _outbox: &mut Vec, + now: Instant, + outbox: &mut Vec, from: PlatformId, msg: PeerMsg, ) -> Option { @@ -97,6 +182,9 @@ impl Node { self.handle_prepare_ack(from, epoch); None } + PeerMsg::Share { epoch, share } => { + self.handle_share(now, outbox, from, epoch, share) + } _ => todo!( "cannot handle message variant yet - not implemented: {msg:?}" ), @@ -108,6 +196,7 @@ impl Node { self.coordinator_state.as_ref() } + // Handle receipt of a `PrepareAck` message fn handle_prepare_ack(&mut self, from: PlatformId, epoch: Epoch) { // Are we coordinating for this epoch? if let Some(cs) = &mut self.coordinator_state { @@ -133,6 +222,39 @@ impl Node { } } + // Handle receipt of a `Share` message + fn handle_share( + &mut self, + now: Instant, + outbox: &mut Vec, + from: PlatformId, + epoch: Epoch, + share: Share, + ) -> Option { + // Are we coordinating and expecting shares for `epoch`, which must + // be the last committed epoch? + if let Some(cs) = &mut self.coordinator_state { + if let Some(my_prepare_msg) = + cs.handle_share(now, outbox, from, epoch, share) + { + // Add the prepare to our `PersistentState` + self.persistent_state + .prepares + .insert(my_prepare_msg.config.epoch, my_prepare_msg); + + return Some(self.persistent_state.clone()); + } + } else { + warn!( + self.log, + "Received share when not coordinating"; + "from" => %from, + "epoch" => %epoch + ); + } + None + } + // Send any required messages as a reconfiguration coordinator fn send_coordinator_msgs( &mut self, @@ -175,7 +297,9 @@ impl Node { return Ok(Some(self.persistent_state.clone())); } - // We have a committed configuration that is not LRTQ + // Safety: We have a committed configuration that is not LRTQ + let our_share = + self.persistent_state.last_committed_key_share().unwrap(); let config = self.persistent_state.last_committed_configuration().unwrap(); @@ -184,6 +308,7 @@ impl Node { now, msg, &config, + our_share, )?); Ok(None) diff --git a/trust-quorum/src/persistent_state.rs b/trust-quorum/src/persistent_state.rs index 32aceebca4d..a3d1461454c 100644 --- a/trust-quorum/src/persistent_state.rs +++ b/trust-quorum/src/persistent_state.rs @@ -7,13 +7,13 @@ //! Note that this state is not necessarily directly serialized and saved. use crate::crypto::LrtqShare; -use crate::messages::{CommitMsg, PrepareMsg}; +use crate::messages::PrepareMsg; use crate::{Configuration, Epoch, PlatformId}; use bootstore::schemes::v0::SharePkgCommon as LrtqShareData; use gfss::shamir::Share; use omicron_uuid_kinds::{GenericUuid, RackUuid}; use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; /// All the persistent state for this protocol #[derive(Debug, Clone, Serialize, Deserialize, Default)] @@ -22,7 +22,7 @@ pub struct PersistentState { // data it read from disk. This allows us to upgrade from LRTQ. pub lrtq: Option, pub prepares: BTreeMap, - pub commits: BTreeMap, + pub commits: BTreeSet, // Has the node seen a commit for an epoch higher than it's current // configuration for which it has not received a `PrepareMsg` for? If at @@ -37,7 +37,7 @@ impl PersistentState { PersistentState { lrtq: None, prepares: BTreeMap::new(), - commits: BTreeMap::new(), + commits: BTreeSet::new(), decommissioned: None, } } @@ -66,7 +66,7 @@ impl PersistentState { } pub fn last_committed_epoch(&self) -> Option { - self.commits.keys().last().map(|epoch| *epoch) + self.commits.last().map(|epoch| *epoch) } // Get the configuration for the current epoch from its prepare message @@ -88,7 +88,7 @@ impl PersistentState { // Return the key share for the latest committed trust quorum configuration // if one exists - pub fn key_share(&self) -> Option { + pub fn last_committed_key_share(&self) -> Option { self.last_committed_epoch().map(|epoch| { self.prepares.get(&epoch).expect("missing prepare").share.clone() }) diff --git a/trust-quorum/src/validators.rs b/trust-quorum/src/validators.rs index 548377a7ff6..8ec4a5d5a55 100644 --- a/trust-quorum/src/validators.rs +++ b/trust-quorum/src/validators.rs @@ -4,7 +4,9 @@ //! Various validation functions to be used by a [`crate::Node`] -use crate::configuration::ConfigurationError; +use crate::errors::{ + MismatchedRackIdError, ReconfigurationError, SledDecommissionedError, +}; use crate::messages::ReconfigureMsg; use crate::{Epoch, PersistentStateSummary, PlatformId, Threshold}; use omicron_uuid_kinds::RackUuid; @@ -44,84 +46,6 @@ fn check_in_service( Ok(()) } -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] -#[error( - "sled was decommissioned on msg from {from:?} at epoch {epoch:?}: last prepared epoch = {last_prepared_epoch:?}" -)] -pub struct SledDecommissionedError { - from: PlatformId, - epoch: Epoch, - last_prepared_epoch: Option, -} - -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] -#[error("mismatched rack id: expected {expected:?}, got {got:?}")] -pub struct MismatchedRackIdError { - pub expected: RackUuid, - pub got: RackUuid, -} - -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] -pub enum ReconfigurationError { - #[error("reconfiguration coordinator must be a member of the new group")] - CoordinatorMustBeAMemberOfNewGroup, - - #[error("upgrade from LRTQ required")] - UpgradeFromLrtqRequired, - - #[error( - "number of members: {num_members:?} must be greater than threshold: {threshold:?}" - )] - ThresholdMismatch { num_members: usize, threshold: Threshold }, - - #[error( - "invalid membership size: {0:?}: must be between 3 and 32 inclusive" - )] - InvalidMembershipSize(usize), - - #[error( - "invalid threshold: {0:?}: threshold must be between 2 and 31 inclusive" - )] - InvalidThreshold(Threshold), - - #[error( - "Node has last committed epoch of {node_epoch:?}, message contains {msg_epoch:?}" - )] - LastCommittedEpochMismatch { - node_epoch: Option, - msg_epoch: Option, - }, - - #[error( - "sled has already prepared a request at epoch {existing:?}, and cannot prepare another at a smaller or equivalent epoch {new:?}" - )] - PreparedEpochMismatch { existing: Epoch, new: Epoch }, - - #[error("invalid rack id in reconfigure msg")] - InvalidRackId( - #[from] - #[source] - MismatchedRackIdError, - ), - - #[error("cannot reconfigure a decommissioned sled")] - DecommissionedSled( - #[from] - #[source] - SledDecommissionedError, - ), - #[error( - "reconfiguration in progress at epoch {current_epoch:?}: cannot reconfigure for older epoch {msg_epoch:?}" - )] - ReconfigurationInProgress { current_epoch: Epoch, msg_epoch: Epoch }, - - #[error("mismatched reconfiguration requests for epoch {0:?}")] - MismatchedReconfigurationForSameEpoch(Epoch), - - #[error(transparent)] - Configuration(#[from] ConfigurationError), -} - /// A `ReconfigureMsg` that has been determined to be valid for the remainder /// of code paths. We encode this check into a type in a "parse, don't validate" /// manner. diff --git a/trust-quorum/tests/coordinator.rs b/trust-quorum/tests/coordinator.rs index d9bb0237f8f..13993595b66 100644 --- a/trust-quorum/tests/coordinator.rs +++ b/trust-quorum/tests/coordinator.rs @@ -6,6 +6,7 @@ use assert_matches::assert_matches; use bcs::Result; +use gfss::shamir::Share; use omicron_test_utils::dev::test_setup_log; use omicron_uuid_kinds::RackUuid; use prop::sample::Index; @@ -49,6 +50,9 @@ impl Sut { /// the SUT node itself to understand external events like messages that got /// sent in response to a tick. pub struct Model { + // A copy of our id + pub id: PlatformId, + /// The current time pub now: Instant, @@ -60,8 +64,9 @@ pub struct Model { } impl Model { - pub fn new() -> Model { + pub fn new(coordinator_id: PlatformId) -> Model { Model { + id: coordinator_id, now: Instant::now(), last_committed_config_msg: None, coordinator_state: None, @@ -91,11 +96,9 @@ impl Model { // We don't currently collect LRTQ shares. This conditional // will have to be more specific when we do. let op = if self.last_committed_config_msg.is_some() { - ModelCoordinatorOp::CollectShares { - collected_from: BTreeSet::new(), - } + ModelCoordinatorOp::new_collect_shares(self.id.clone()) } else { - ModelCoordinatorOp::Prepare { acked: BTreeSet::new() } + ModelCoordinatorOp::new_prepare(self.id.clone()) }; self.coordinator_state = Some(ModelCoordinatorState { @@ -105,6 +108,11 @@ impl Model { }); } + pub fn commit(&mut self, msg: ReconfigureMsg) { + self.last_committed_config_msg = Some(msg); + self.coordinator_state = None; + } + // If we are currently preparing this epoch then record the ack pub fn ack_prepare(&mut self, from: PlatformId, epoch: Epoch) { if let Some(cs) = &mut self.coordinator_state { @@ -114,6 +122,47 @@ impl Model { } } + /// If we are currently waiting for shares in this epoch then record the + /// responder. + /// + /// If we've received enough shares to recompute the rack secret then + /// transition the model operation to `Prepare` and return `Ok(true)`. + pub fn handle_share( + &mut self, + from: PlatformId, + epoch: Epoch, + ) -> Result { + let Some(last_committed_config_msg) = &self.last_committed_config_msg + else { + return Ok(false); + }; + let Some(cs) = &mut self.coordinator_state else { + return Ok(false); + }; + + // Make sure our model state is consistent + prop_assert_eq!( + Some(last_committed_config_msg.epoch), + cs.msg.last_committed_epoch + ); + + // Record the share + if last_committed_config_msg.epoch == epoch { + cs.op.handle_share(from); + } + + // Do we have a threshold of shares? If so, we should transition our + // operation to `Prepare`. + if cs.op.collected_shares().unwrap_or(&BTreeSet::new()).len() + == last_committed_config_msg.threshold.0 as usize + { + cs.op = ModelCoordinatorOp::new_prepare(self.id.clone()); + return Ok(true); + } + + Ok(false) + } + /// Return nodes that have acked prepares for an ongoing coordination /// by the Model. /// @@ -122,6 +171,65 @@ impl Model { self.coordinator_state.as_ref().and_then(|cs| cs.op.acked_prepares()) } + /// Return the nodes that we are waiting for prepare acknowledgements from + pub fn waiting_for_prepare_acks_from( + &self, + ) -> Option> { + // We need to actually be coordinating to wait for shares + let Some(cs) = &self.coordinator_state else { + return None; + }; + // We need to actually be waiting for acks + let Some(acked_prepares) = self + .coordinator_state + .as_ref() + .and_then(|cs| cs.op.acked_prepares()) + else { + return None; + }; + + // Return all members of the current configuration that haven't acked a + // `PrepareMsg` yet. Exclude ourself. + + let mut waiting_for: BTreeSet<_> = + cs.msg.members.difference(acked_prepares).cloned().collect(); + waiting_for.remove(&self.id); + Some(waiting_for) + } + + // Return the nodes that we are waiting for shares from + pub fn waiting_for_shares_from(&self) -> Option> { + // We only wait for shares if we have a prior committed config + let Some(last_committed_config_msg) = &self.last_committed_config_msg + else { + return None; + }; + // We need to actually be coordinating to wait for shares + let Some(cs) = &self.coordinator_state else { + return None; + }; + // We need to actually be collecting shares as a coordinator + let Some(collected_from) = cs.op.collected_shares() else { + return None; + }; + + // Return all members of the last committed configuration that we + // haven't received a share from yet. Exclude ourself. + + let mut waiting_for: BTreeSet<_> = last_committed_config_msg + .members + .difference(collected_from) + .cloned() + .collect(); + waiting_for.remove(&self.id); + Some(waiting_for) + } + + // Return the current epoch being coordinated, if there is one + fn coordinating_epoch(&self) -> Option { + self.coordinator_state.as_ref().map(|cs| cs.msg.epoch) + } + /// Return the members of the current reconfiguration being coordinated. /// /// Return the empty set if there is no ongoing coordination. @@ -149,6 +257,20 @@ pub enum ModelCoordinatorOp { } impl ModelCoordinatorOp { + pub fn new_collect_shares(ourself: PlatformId) -> Self { + ModelCoordinatorOp::CollectShares { + // Always include ourself + collected_from: BTreeSet::from([ourself]), + } + } + + pub fn new_prepare(ourself: PlatformId) -> Self { + ModelCoordinatorOp::Prepare { + // Always include ourself + acked: BTreeSet::from([ourself]), + } + } + pub fn ack_prepare(&mut self, from: PlatformId) { if let ModelCoordinatorOp::Prepare { acked } = self { acked.insert(from); @@ -162,6 +284,32 @@ impl ModelCoordinatorOp { None } } + + pub fn handle_share(&mut self, from: PlatformId) { + if let ModelCoordinatorOp::CollectShares { collected_from } = self { + collected_from.insert(from); + } + } + + pub fn collected_shares(&self) -> Option<&BTreeSet> { + if let ModelCoordinatorOp::CollectShares { collected_from } = self { + Some(collected_from) + } else { + None + } + } + + pub fn is_collecting_shares(&self) -> bool { + matches!(self, ModelCoordinatorOp::CollectShares { .. }) + } + + pub fn is_collecting_lrtq_shares(&self) -> bool { + matches!(self, ModelCoordinatorOp::CollectLrtqShares { .. }) + } + + pub fn is_preparing(&self) -> bool { + matches!(self, ModelCoordinatorOp::Prepare { .. }) + } } // The test itself maintains an internal state that includes not only the state @@ -169,37 +317,69 @@ impl ModelCoordinatorOp { // tests via `Action`s. struct TestState { /// Our system under test - pub sut: Sut, + sut: Sut, /// The abstract model of our SUT - pub model: Model, + model: Model, // All in flight messages to nodes that are not the SUT - pub network_msgs: BTreeMap>, + network_msgs: BTreeMap>, // Messages delivered to each node that is not the SUT - pub delivered_msgs: BTreeMap>, + delivered_msgs: BTreeMap>, + + // Shares delivered to nodes that are not the SUT. + // + // We track these so that we can implement replies for `GetShare` messages. + delivered_shares: BTreeMap<(Epoch, PlatformId), Share>, + + // A cache of our member universe, so we only have to generate it once + member_universe: Vec, + + // The last epoch for a `PrepareMsg` sent by Nexus + // In production this is stored in CRDB, but it is simulated here + highest_prepared_epoch: Epoch, + + // We reuse the same coordinator ID across all test runs + // There's no reason to randomize it. + coordinator_id: PlatformId, + + // No reason to change the rack_id + rack_id: RackUuid, } impl TestState { - pub fn new(log: Logger, coordinator_id: PlatformId) -> TestState { + pub fn new(log: Logger) -> TestState { + let member_universe = member_universe(); + let coordinator_id = member_universe[0].clone(); TestState { sut: Sut { - node: Node::new(log, coordinator_id, PersistentState::empty()), + node: Node::new( + log, + coordinator_id.clone(), + PersistentState::empty(), + ), persistent_state: PersistentState::empty(), }, - model: Model::new(), + model: Model::new(coordinator_id.clone()), network_msgs: BTreeMap::new(), delivered_msgs: BTreeMap::new(), + delivered_shares: BTreeMap::new(), + member_universe, + highest_prepared_epoch: Epoch(0), + coordinator_id, + rack_id: RackUuid::new_v4(), } } pub fn action_coordinate_reconfiguration( &mut self, - msg: ReconfigureMsg, + generated_config: GeneratedConfiguration, ) -> Result<(), TestCaseError> { let mut outbox = Vec::new(); + let msg = self.generated_config_to_reconfigure_msg(generated_config); + // Update the model state self.model.action_coordinate_reconfiguration(msg.clone()); @@ -233,9 +413,15 @@ impl TestState { self.send(outbox.into_iter()); } None => { - // The request is idempotent - // No action should have been taken - prop_assert!(outbox.is_empty()); + // This is a reconfiguration, so before a `PersistentState` + // is returned we must collect share for the last committed + // configuration. + self.assert_envelopes_after_coordinate_reconfiguration( + &outbox, + )?; + + // We validated our messages. Let's put them into our test state as "in-flight". + self.send(outbox.into_iter()); } } @@ -252,12 +438,21 @@ impl TestState { Action::DeliverMsgs(indices) => { self.action_deliver_msgs(indices); } + Action::DropMessages(indices) => { + self.action_drop_msgs(indices); + } Action::Reply(indices) => { self.action_reply(indices)?; } Action::Tick(time_jump) => { self.action_tick(time_jump)?; } + Action::Commit(index) => { + self.action_commit(index)?; + } + Action::CoordinateReconfiguration(generated_config) => { + self.action_coordinate_reconfiguration(generated_config)?; + } } } Ok(()) @@ -273,6 +468,14 @@ impl TestState { for index in indices { let id = index.get(&destinations); if let Some(msg) = self.network_msgs.get_mut(id).unwrap().pop() { + // If the message is a `Prepare`, also save its share so we can + // reply to `GetShare` messages. + if let PeerMsg::Prepare(prepare) = &msg { + self.delivered_shares.insert( + (prepare.config.epoch, id.clone()), + prepare.share.clone(), + ); + } let msgs = self.delivered_msgs.entry(id.clone()).or_default(); msgs.push(msg); } @@ -282,6 +485,22 @@ impl TestState { self.network_msgs.retain(|_, msgs| !msgs.is_empty()); } + // Drop network messages destined for generated destinations + fn action_drop_msgs(&mut self, indices: Vec) { + let destinations: Vec<_> = self.network_msgs.keys().cloned().collect(); + if destinations.is_empty() { + // nothing to do + return; + } + for index in indices { + let id = index.get(&destinations); + let _ = self.network_msgs.get_mut(id).unwrap().pop(); + } + + // Remove any destinations with zero messages in-flight + self.network_msgs.retain(|_, msgs| !msgs.is_empty()); + } + /// Send replies from nodes with delivered messages fn action_reply( &mut self, @@ -303,6 +522,9 @@ impl TestState { PeerMsg::Prepare(prepare_msg) => { self.reply_to_prepare_msg(from.clone(), prepare_msg)?; } + PeerMsg::GetShare(epoch) => { + self.reply_to_get_share_msg(from.clone(), epoch)?; + } _ => todo!(), } } @@ -327,35 +549,142 @@ impl TestState { // If time has advanced past the coordinator's retry deadline // then we must see if we expected any retries to be sent. if timer_expired { - // Get the members of the current configuration being coordinated - let members = self.model.active_reconfiguration_members(); + self.assert_expected_outgoing_envelopes(&outbox)?; + } + + // Put any output messages onto the network + self.send(outbox.into_iter()); + + Ok(()) + } - // We aren't coordinating - if members.is_empty() { - prop_assert!(outbox.is_empty()); - return Ok(()); + fn assert_expected_outgoing_envelopes( + &self, + outbox: &[Envelope], + ) -> Result<(), TestCaseError> { + if let Some(expected) = self.model.waiting_for_prepare_acks_from() { + for envelope in outbox { + assert_matches!(envelope.msg, PeerMsg::Prepare(_)); + prop_assert!(expected.contains(&envelope.to)); } + // We are only sending either `Prepare` or `GetShare` variants, not both. + return Ok(()); + } - if let Some(acked_members) = self.model.acked_prepares() { - // We expect retries for all members that the coordinator - // has not received acks for. - let expected: BTreeSet<_> = - members.difference(acked_members).collect(); - for envelope in &outbox { - prop_assert!(expected.contains(&envelope.to)); - } - } else { - // We aren't waiting on acks, so won't retry sending prepares - prop_assert!(outbox.is_empty()); + if let Some(expected) = self.model.waiting_for_shares_from() { + for envelope in outbox { + assert_matches!(envelope.msg, PeerMsg::GetShare(_)); + prop_assert!(expected.contains(&envelope.to)); } } - // Put any output messages onto the network - self.send(outbox.into_iter()); + Ok(()) + } + + // Precondition: Nexus will only actually commit if enough nodes + // have acked the `PrepareMsg` from the coordinator. Let's simulate + // that by checking our model state. + // + // Return `Some(reconfigure_msg)` if Nexus would decide to commit, `None` otherwise. + fn action_commit_precondition( + &mut self, + index: Index, + ) -> Option { + // If we aren't currently coordinating, then just return as there's no + // configuration to commit. + let Some(cs) = &self.model.coordinator_state else { + return None; + }; + + // Compute the safety threshold used by nexus + // Nexus will choose a constant, but we vary it because we can. + let max_possible_z = cs.msg.members.len() - cs.msg.threshold.0 as usize; + let z = + if max_possible_z == 0 { 0 } else { index.index(max_possible_z) }; + + // We can only commit when the coordinator has seen a `PrepareAck` from + // K + Z nodes. + // + // Justification: The coordinator does not know what `Z` is and cannot + // know if it has seen enough acks to know if it's safe to commit, so + // nexus must not try. Furthermore, Nexus informs non-coordinator nodes + // when to commit and they have no way to know if enough nodes have + // shares as a result of receiving `PrepareMsg`s. Because of this, we + // rely on Nexus to reliably uphold this correctness invariant, and + // simulate it in the test. + let acked = cs.op.acked_prepares().map(|s| s.len()).unwrap_or(0); + if acked >= cs.msg.threshold.0 as usize + z { + Some(cs.msg.clone()) + } else { + None + } + } + + fn action_commit(&mut self, index: Index) -> Result<(), TestCaseError> { + let Some(msg) = self.action_commit_precondition(index) else { + // We failed our precondition check. This isn't a test failure. It + // just means that the coordinator is not ready to commit. + return Ok(()); + }; + + // Commit the configuration at our node + let persistent_state = self + .sut + .node + .commit_reconfiguration(msg.epoch, msg.rack_id)? + .expect("persistent state has been updated"); + + // Check the output of the persistent state + self.assert_persistent_state_after_commit_reconfiguration( + msg.epoch, + &persistent_state, + )?; + + // Save the persistent state + self.sut.persistent_state = persistent_state; + + // Update our model state + self.model.commit(msg); Ok(()) } + fn generated_config_to_reconfigure_msg( + &mut self, + config: GeneratedConfiguration, + ) -> ReconfigureMsg { + let mut members: BTreeSet = config + .members + .iter() + .map(|index| self.member_universe[*index].clone()) + .collect(); + + // Ensure that the configuration always includes the coordinator + members.insert(self.coordinator_id.clone()); + + let threshold = Threshold(usize::max( + 2, + config.threshold.index(members.len()), + ) as u8); + + // Increment the epoch for this configuration. This simulates what Nexus + // would do. + self.highest_prepared_epoch = self.highest_prepared_epoch.inc(); + + ReconfigureMsg { + rack_id: self.rack_id, + epoch: self.highest_prepared_epoch, + last_committed_epoch: self + .model + .last_committed_config_msg + .as_ref() + .map(|msg| msg.epoch), + members, + threshold, + retry_timeout: Duration::from_millis(RETRY_TIMEOUT_MS), + } + } + fn reply_to_prepare_msg( &mut self, from: PlatformId, @@ -384,13 +713,90 @@ impl TestState { // Ensure that if the SUT is waiting for prepares, that the ack // is accounted for. - if let Some(acks) = self.model.acked_prepares() { - prop_assert!(acks.contains(&from)); + // + // We only perform this check if this reply is for the current epoch + if self.model.coordinating_epoch() == Some(msg.config.epoch) { + if let Some(acks) = self.model.acked_prepares() { + prop_assert!(acks.contains(&from)); + } } Ok(()) } + fn reply_to_get_share_msg( + &mut self, + from: PlatformId, + epoch: Epoch, + ) -> Result<(), TestCaseError> { + // If we have a share, then return it. Otherwise return nothing. + // In the future we may implement other behaviors. + let Some(share) = self.delivered_shares.get(&(epoch, from.clone())) + else { + return Ok(()); + }; + let reply = PeerMsg::Share { epoch, share: share.clone() }; + let mut outbox = Vec::new(); + let output = self.sut.node.handle( + self.model.now, + &mut outbox, + from.clone(), + reply, + ); + + // If we just received a threshold number of shares, we expect + // reconstruction of the rack secret for the last committed + // configuration and the SUT to start sending prepare messages. As part + // of this transition the persistent state with the SUT's own latest + // `PrepareMsg` is also returned. + let start_preparing = self.model.handle_share(from.clone(), epoch)?; + if start_preparing { + // We just transitioned to preparing + let Some(persistent_state) = output else { + return Err(TestCaseError::fail( + "Persistent state should exist", + )); + }; + + // The same checks should hold. + // TODO: Perhaps we should rename this method. + self.assert_persistent_state_after_coordinate_reconfiguration( + &persistent_state, + )?; + + // We validated our persistent state is correct. Save it and move + // on. + self.sut.persistent_state = persistent_state; + } + + self.assert_expected_outgoing_envelopes(&outbox)?; + + self.send(outbox.into_iter()); + + Ok(()) + } + + /// Ensure that the output of `Node::commit_reconfiguration` is valid + /// given the test state. + fn assert_persistent_state_after_commit_reconfiguration( + &self, + epoch: Epoch, + persistent_state: &PersistentState, + ) -> Result<(), TestCaseError> { + // We should have one new commit for epoch: Epoch + prop_assert_eq!( + self.sut.persistent_state.commits.len() + 1, + persistent_state.commits.len() + ); + assert!(!self.sut.persistent_state.commits.contains(&epoch)); + assert!(persistent_state.commits.contains(&epoch)); + + // The SUT node should no longer be coordinating + assert!(self.sut.node.get_coordinator_state().is_none()); + + Ok(()) + } + /// Ensure that the output of `Node::coordinate_reconfiguration` /// is valid given the `TestState`. /// @@ -409,6 +815,7 @@ impl TestState { ))? .msg; + // TODO: This will have to change once we start supporting LRTQ prop_assert!(persistent_state.lrtq.is_none()); prop_assert_eq!( &sut.persistent_state.commits, @@ -447,37 +854,58 @@ impl TestState { outbox: &[Envelope], ) -> Result<(), TestCaseError> { let sut = &self.sut; - let msg = &self - .model - .coordinator_state - .as_ref() - .ok_or(TestCaseError::fail( - "Model should have a coordinator state", - ))? - .msg; - - let config = sut.persistent_state.configuration(msg.epoch).unwrap(); - - // Ensure the members of the configuration match the model msg - prop_assert_eq!( - &msg.members, - &config.members.keys().cloned().collect() - ); + let model_cs = &self.model.coordinator_state.as_ref().ok_or( + TestCaseError::fail("Model should have a coordinator state"), + )?; // The coordinator should send messages to every node but itself - assert_eq!(outbox.len(), config.members.len() - 1); for envelope in outbox { - assert_matches!( - &envelope.msg, - PeerMsg::Prepare(PrepareMsg { config: prepare_config, .. }) => { - assert_eq!(*config, *prepare_config); - } - ); - prop_assert_eq!(&envelope.from, &config.coordinator); + if model_cs.op.is_preparing() { + // We should be sending to every member of the current config + // except ourself. + assert_eq!(outbox.len(), model_cs.msg.members.len() - 1); + + // Safety: We'll always have this configuration in the SUT persistent + // state if we are preparing. + let config = sut + .persistent_state + .configuration(model_cs.msg.epoch) + .expect("config for this epoch: {epoch}"); + + assert_matches!( + &envelope.msg, + PeerMsg::Prepare(PrepareMsg { config: prepare_config, .. }) => { + assert_eq!(*config, *prepare_config); + } + ); + } else { + // We should be sending to every member of the previous config + // except ourself. + // + // Safety: If we got here, then we have a prior committed + // configuration. + assert_eq!( + outbox.len(), + self.model + .last_committed_config_msg + .as_ref() + .unwrap() + .members + .len() + - 1 + ); + assert_matches!( + &envelope.msg, + PeerMsg::GetShare(epoch) => { + assert_eq!(*epoch, model_cs.msg.last_committed_epoch.unwrap()); + } + ); + } + prop_assert_eq!(&envelope.from, &self.coordinator_id); // We don't send a prepare to ourselves. We put it in our // `PersistentState` directly. - prop_assert_ne!(&envelope.to, &config.coordinator); + prop_assert_ne!(&envelope.to, &self.coordinator_id); } Ok(()) @@ -514,7 +942,7 @@ pub enum Action { /// use a Vec. We don't worry about shuffling messsages so they are out of /// order, because we use stream oriented transport. //#[weight(3)] - //DropMessages(Vec), + DropMessages(Vec), /// Fake a reply for a delivered message. /// @@ -522,12 +950,13 @@ pub enum Action { /// `test_state.delivered_msgs`. #[weight(45)] Reply(Vec), + // Start a reconfiguration that has a possibility of succeeding // given delivered messages. // // It will never fail a validation check. //#[weight(2)] - //StartReconfiguration(GeneratedConfiguration), + CoordinateReconfiguration(GeneratedConfiguration), // A tick duration in milliseconds #[weight(50)] @@ -538,6 +967,17 @@ pub enum Action { )] Duration, ), + + // Attempt to Commit a reconfiguration if possible + // + // We'll only attempt to commit a reconfiguration if K+Z nodes + // have acked prepares for that epoch. + // + // Z is randomly chosen between 0 and N-K, and is a "safety parameter" + // described in RFD 238. + // + // `Index` is used to compute Z here. + Commit(Index), } /// Informnation about configurations used at test generation time @@ -552,7 +992,13 @@ pub struct GeneratedConfiguration { /// still be duplicated due to the shift implementation used. Therefore we /// instead just choose from a constrained set of usize values that we can /// use directly as indexes into our fixed size structure for all tests. - #[strategy(btree_set(0..=255usize, MIN_CLUSTER_SIZE..=MAX_CLUSTER_SIZE))] + /// + /// Note that we intentionally set the max set size to MAX_CLUSTER_SIZE-1. + /// This is because we always want to include the coordinator in the + /// configuration, but it's value may not be chosen randomly. In this case, + /// we have to add it to the actual membership set we generate from this + /// configuration with [`TestState::generated_config_to_reconfigure_msg`]. + #[strategy(btree_set(0..=255usize, MIN_CLUSTER_SIZE..MAX_CLUSTER_SIZE))] pub members: BTreeSet, /// An index is roughly equivalent to a threshold, since a threshold cannot @@ -584,60 +1030,13 @@ pub struct TestInput { fn test_coordinator_behavior_from_empty_state(input: TestInput) { let logctx = test_setup_log("coordinator_behavior_from_empty_state"); - let universe = member_universe(); - let rack_id = RackUuid::new_v4(); - - // We don't know the coordinator ID until we translate the indexes in the - // test input to `PlatformId`s. - let coordinator_id = None; - let initial_config_msg = generated_config_to_reconfigure_msg( - &universe, - coordinator_id, - Epoch(1), - rack_id, - input.initial_config, - ); - - let coordinator_id = initial_config_msg.members.first().unwrap().clone(); - let mut state = TestState::new(logctx.log.clone(), coordinator_id); + let mut state = TestState::new(logctx.log.clone()); // Start the initial configuration - state.action_coordinate_reconfiguration(initial_config_msg)?; + state.action_coordinate_reconfiguration(input.initial_config)?; // Start executing the actions state.run_actions(input.actions)?; logctx.cleanup_successful(); } - -fn generated_config_to_reconfigure_msg( - member_universe: &[PlatformId], - coordinator_id: Option<&PlatformId>, - epoch: Epoch, - rack_id: RackUuid, - config: GeneratedConfiguration, -) -> ReconfigureMsg { - let mut members: BTreeSet = config - .members - .iter() - .map(|index| member_universe[*index].clone()) - .collect(); - - if let Some(coordinator_id) = coordinator_id { - members.insert(coordinator_id.clone()); - } - - println!("members = {members:#?}"); - - let threshold = - Threshold(usize::max(2, config.threshold.index(members.len())) as u8); - - ReconfigureMsg { - rack_id, - epoch, - last_committed_epoch: None, - members, - threshold, - retry_timeout: Duration::from_millis(RETRY_TIMEOUT_MS), - } -} From 9858e7be8a1a983ec4e78c9d8c7ffe045a9d3dd7 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Fri, 25 Apr 2025 23:08:34 +0000 Subject: [PATCH 2/4] Reduce send_msgs filter condition It's no longer necessary to filter out the coordinator explicitly, as it's share is always included in the `collected_shares` upon construction. --- trust-quorum/src/coordinator_state.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/trust-quorum/src/coordinator_state.rs b/trust-quorum/src/coordinator_state.rs index 3100ae5bd77..80a99da1519 100644 --- a/trust-quorum/src/coordinator_state.rs +++ b/trust-quorum/src/coordinator_state.rs @@ -187,10 +187,7 @@ impl CoordinatorState { // Also exclude ourself. for member in last_committed_members .keys() - .filter(|&m| { - m != self.reconfigure_msg.coordinator_id() - && !collected_shares.contains_key(m) - }) + .filter(|&m| !collected_shares.contains_key(m)) .cloned() { outbox.push(Envelope { From bc17d08e15bc54f77b31107b1fc186c6a1d17c92 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Fri, 25 Apr 2025 23:11:03 +0000 Subject: [PATCH 3/4] ugh, missed a comment --- trust-quorum/src/coordinator_state.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/trust-quorum/src/coordinator_state.rs b/trust-quorum/src/coordinator_state.rs index 80a99da1519..63f99524c8f 100644 --- a/trust-quorum/src/coordinator_state.rs +++ b/trust-quorum/src/coordinator_state.rs @@ -184,7 +184,6 @@ impl CoordinatorState { .. } => { // Send to all members that we haven't yet collected shares from. - // Also exclude ourself. for member in last_committed_members .keys() .filter(|&m| !collected_shares.contains_key(m)) From cbb7db423a38d5862e3d6db8a5cd5ff5476a7fd2 Mon Sep 17 00:00:00 2001 From: "Andrew J. Stone" Date: Fri, 25 Apr 2025 23:19:53 +0000 Subject: [PATCH 4/4] Remove another redundant removal of coordinator id --- trust-quorum/src/coordinator_state.rs | 1 - trust-quorum/tests/coordinator.rs | 25 ++++++++++--------------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/trust-quorum/src/coordinator_state.rs b/trust-quorum/src/coordinator_state.rs index 63f99524c8f..9707968035a 100644 --- a/trust-quorum/src/coordinator_state.rs +++ b/trust-quorum/src/coordinator_state.rs @@ -198,7 +198,6 @@ impl CoordinatorState { } CoordinatorOperation::CollectLrtqShares { .. } => {} CoordinatorOperation::Prepare { prepares, .. } => { - // prepares already filter ourself for (platform_id, prepare) in prepares.clone().into_iter() { outbox.push(Envelope { to: platform_id, diff --git a/trust-quorum/tests/coordinator.rs b/trust-quorum/tests/coordinator.rs index 13993595b66..ff9467c6655 100644 --- a/trust-quorum/tests/coordinator.rs +++ b/trust-quorum/tests/coordinator.rs @@ -189,12 +189,8 @@ impl Model { }; // Return all members of the current configuration that haven't acked a - // `PrepareMsg` yet. Exclude ourself. - - let mut waiting_for: BTreeSet<_> = - cs.msg.members.difference(acked_prepares).cloned().collect(); - waiting_for.remove(&self.id); - Some(waiting_for) + // `PrepareMsg` yet. + Some(cs.msg.members.difference(acked_prepares).cloned().collect()) } // Return the nodes that we are waiting for shares from @@ -214,15 +210,14 @@ impl Model { }; // Return all members of the last committed configuration that we - // haven't received a share from yet. Exclude ourself. - - let mut waiting_for: BTreeSet<_> = last_committed_config_msg - .members - .difference(collected_from) - .cloned() - .collect(); - waiting_for.remove(&self.id); - Some(waiting_for) + // haven't received a share from yet. + Some( + last_committed_config_msg + .members + .difference(collected_from) + .cloned() + .collect(), + ) } // Return the current epoch being coordinated, if there is one