diff --git a/trust-quorum/src/coordinator_state.rs b/trust-quorum/src/coordinator_state.rs index b810216a085..9707968035a 100644 --- a/trust-quorum/src/coordinator_state.rs +++ b/trust-quorum/src/coordinator_state.rs @@ -4,13 +4,18 @@ //! State of a reconfiguration coordinator inside a [`crate::Node`] -use crate::crypto::{LrtqShare, Sha3_256Digest, ShareDigestLrtq}; +use crate::configuration::PreviousConfiguration; +use crate::crypto::{self, LrtqShare, Sha3_256Digest, ShareDigestLrtq}; +use crate::errors::ReconfigurationError; use crate::messages::{PeerMsg, PrepareMsg}; -use crate::validators::{ReconfigurationError, ValidatedReconfigureMsg}; -use crate::{Configuration, Envelope, Epoch, PlatformId}; +use crate::validators::ValidatedReconfigureMsg; +use crate::{ + Configuration, Envelope, Epoch, PlatformId, RackSecret, Threshold, +}; use gfss::shamir::Share; -use slog::{Logger, o, warn}; +use slog::{Logger, error, info, o, warn}; use std::collections::{BTreeMap, BTreeSet}; +use std::mem; use std::time::Instant; /// The state of a reconfiguration coordinator. @@ -77,9 +82,16 @@ impl CoordinatorState { } let op = CoordinatorOperation::Prepare { prepares, - prepare_acks: BTreeSet::new(), + // Always include ourself + prepare_acks: BTreeSet::from([msg.coordinator_id().clone()]), }; + info!( + log, + "Starting coordination on uninitialized node"; + "epoch" => %config.epoch + ); + let state = CoordinatorState::new(log, now, msg, config, op); // Safety: Construction of a `ValidatedReconfigureMsg` ensures that @@ -94,15 +106,28 @@ impl CoordinatorState { now: Instant, msg: ValidatedReconfigureMsg, last_committed_config: &Configuration, + our_last_committed_share: Share, ) -> Result { let (config, new_shares) = Configuration::new(&msg)?; + info!( + log, + "Starting coordination on existing node"; + "epoch" => %config.epoch, + "last_committed_epoch" => %last_committed_config.epoch + ); + // We must collect shares from the last configuration // so we can recompute the old rack secret. let op = CoordinatorOperation::CollectShares { - epoch: last_committed_config.epoch, - members: last_committed_config.members.clone(), - collected_shares: BTreeMap::new(), + last_committed_epoch: last_committed_config.epoch, + last_committed_members: last_committed_config.members.clone(), + last_committed_threshold: last_committed_config.threshold, + // Always include ourself + collected_shares: BTreeMap::from([( + msg.coordinator_id().clone(), + our_last_committed_share, + )]), new_shares, }; @@ -146,7 +171,6 @@ impl CoordinatorState { // will return a copy of it. // // This method is "in progress" - allow unused parameters for now - #[expect(unused)] pub fn send_msgs(&mut self, now: Instant, outbox: &mut Vec) { if now < self.retry_deadline { return; @@ -154,13 +178,26 @@ impl CoordinatorState { self.retry_deadline = now + self.reconfigure_msg.retry_timeout(); match &self.op { CoordinatorOperation::CollectShares { - epoch, - members, + last_committed_epoch, + last_committed_members, collected_shares, .. - } => {} - CoordinatorOperation::CollectLrtqShares { members, shares } => {} - CoordinatorOperation::Prepare { prepares, prepare_acks } => { + } => { + // Send to all members that we haven't yet collected shares from. + for member in last_committed_members + .keys() + .filter(|&m| !collected_shares.contains_key(m)) + .cloned() + { + outbox.push(Envelope { + to: member, + from: self.reconfigure_msg.coordinator_id().clone(), + msg: PeerMsg::GetShare(*last_committed_epoch), + }); + } + } + CoordinatorOperation::CollectLrtqShares { .. } => {} + CoordinatorOperation::Prepare { prepares, .. } => { for (platform_id, prepare) in prepares.clone().into_iter() { outbox.push(Envelope { to: platform_id, @@ -205,24 +242,300 @@ impl CoordinatorState { } } } + + /// Handle a share response for the last_committed epoch. + /// + /// If we transition from collecting shares to sending prepare messages, + /// we also return our own `PrepareMsg` that must be saved as part of the + /// persistent state. + pub fn handle_share( + &mut self, + now: Instant, + outbox: &mut Vec, + from: PlatformId, + epoch: Epoch, + share: Share, + ) -> Option { + match &mut self.op { + CoordinatorOperation::CollectShares { + last_committed_epoch, + last_committed_members, + last_committed_threshold, + collected_shares, + new_shares, + } => { + // First, perform some validation on the incoming share + if *last_committed_epoch != epoch { + warn!( + self.log, + "Received Share from node with wrong epoch"; + "epoch" => %epoch, + "from" => %from + ); + return None; + } + + let Some(expected_digest) = last_committed_members.get(&from) + else { + warn!( + self.log, + "Received Share from unexpected node"; + "epoch" => %epoch, + "from" => %from + ); + return None; + }; + + let mut digest = Sha3_256Digest::default(); + share.digest::(&mut digest.0); + if digest != *expected_digest { + error!( + self.log, + "Received share with invalid digest"; + "epoch" => %epoch, + "from" => %from + ); + } + + // A valid share was received. Is it new? + if collected_shares.insert(from, share).is_some() { + return None; + } + // + // Do we have enough shares to recompute the rack secret + // for `epoch`? + if collected_shares.len() < last_committed_threshold.0 as usize + { + return None; + } + + // Reconstruct the old rack secret from the shares we collected. + let old_rack_secret = match RackSecret::reconstruct_from_iter( + collected_shares.values(), + ) { + Ok(old_rack_secret) => { + info!( + self.log, + "Successfully reconstructed old rack secret"; + "last_committed_epoch" => %epoch, + "epoch" => %self.configuration.epoch + ); + + old_rack_secret + } + Err(err) => { + error!( + self.log, + "Failed to reconstruct old rack secret: {err}"; + "epoch" => %epoch + ); + return None; + } + }; + + // Reconstruct the new rack secret from the new shares. + let new_rack_secret = match RackSecret::reconstruct_from_iter( + new_shares.values(), + ) { + Ok(new_rack_secret) => { + info!( + self.log, + "Successfully reconstructed new rack secret"; + "last_committed_epoch" => %epoch, + "epoch" => %self.configuration.epoch + ); + new_rack_secret + } + Err(err) => { + error!( + self.log, + "Failed to reconstruct new rack secret: {err}"; + "epoch" => %epoch + ); + return None; + } + }; + + // Encrypt our old secret with a key derived from the new secret + let (encrypted_last_committed_rack_secret, salt) = + match crypto::encrypt_old_rack_secret( + old_rack_secret, + new_rack_secret, + self.configuration.rack_id, + *last_committed_epoch, + self.configuration.epoch, + ) { + Ok(val) => val, + Err(_) => { + error!( + self.log, "Failed to encrypt old rack secret"; + "last_committed_epoch" => %epoch, + "epoch" => %self.configuration.epoch + ); + return None; + } + }; + + // Create and set our previous configuration + assert!(self.configuration.previous_configuration.is_none()); + let previous_config = PreviousConfiguration { + epoch: *last_committed_epoch, + is_lrtq: false, + encrypted_last_committed_rack_secret, + encrypted_last_committed_rack_secret_salt: salt, + }; + self.configuration.previous_configuration = + Some(previous_config); + + // Transition to sending `PrepareMsg`s for this configuration + let my_prepare_msg = + self.start_preparing_after_collecting_shares(now, outbox); + Some(my_prepare_msg) + } + op => { + warn!( + self.log, + "Share received when coordinator is not expecting it"; + "op" => op.name(), + "from" => %from + ); + None + } + } + } + + pub fn coordinator_status(&mut self) -> CoordinatorStatus { + (&self.op).into() + } + + // Transition from `CoordinationOperation::CollectShares` + // or `CoordinationOperation::CollectLrtqShares` to + // `CoordinationOperation::Prepare`. + // + // Return our own prepare message so it can be persisted. + // + // Panics if the current op is already `CoordinationOperation::Prepare`. + fn start_preparing_after_collecting_shares( + &mut self, + now: Instant, + outbox: &mut Vec, + ) -> PrepareMsg { + // Get the set of members in both the old and new group along with the + // shares mapped to all members in the new group. + let (existing_members, new_shares) = match &mut self.op { + CoordinatorOperation::CollectShares { + last_committed_members, + new_shares, + .. + } => { + let existing_members: BTreeSet<_> = + mem::take(last_committed_members).into_keys().collect(); + (existing_members, mem::take(new_shares)) + } + CoordinatorOperation::CollectLrtqShares { + last_committed_members, + new_shares, + .. + } => { + let existing_members: BTreeSet<_> = + mem::take(last_committed_members).into_keys().collect(); + (existing_members, mem::take(new_shares)) + } + CoordinatorOperation::Prepare { .. } => { + error!( + self.log, + "logic error: already preparing"; + "epoch" => %self.configuration.epoch, + ); + panic!( + "logic error: already preparing: epoch = {}", + self.configuration.epoch + ); + } + }; + + // Build up our set of `PrepareMsgs` + // + // `my_prepare_msg` is optional only so that we can fill it in via + // the loop. It will always become `Some`, as a `Configuration` always + // contains the coordinator as a member as validated by construction of + // `ValidatedReconfigureMsg`. + let mut my_prepare_msg: Option = None; + let mut prepares = BTreeMap::new(); + for (member, share) in new_shares { + if existing_members.contains(&member) { + let prepare_msg = + PrepareMsg { config: self.configuration.clone(), share }; + if *self.reconfigure_msg.coordinator_id() == member { + my_prepare_msg = Some(prepare_msg); + } else { + prepares.insert(member, prepare_msg); + } + } else { + // New members do not get sent information about the previous + // configuration. + let mut config = self.configuration.clone(); + config.previous_configuration = None; + let prepare_msg = PrepareMsg { config, share }; + prepares.insert(member, prepare_msg); + } + } + + // Actually transition to the new operation + self.op = CoordinatorOperation::Prepare { + prepares, + // Always include ourself + prepare_acks: BTreeSet::from([self + .reconfigure_msg + .coordinator_id() + .clone()]), + }; + + let last_committed_epoch = + self.configuration.previous_configuration.as_ref().unwrap().epoch; + info!( + self.log, + "Starting to prepare after collecting shares"; + "epoch" => %self.configuration.epoch, + // Safety: This whole method relies on having a previous configuration + "last_committed_epoch" => %last_committed_epoch + ); + + // Trigger sending of Prepare messages immediately + self.retry_deadline = now; + self.send_msgs(now, outbox); + + // Return our own `PrepareMsg` for persistence + // Safety: Construction of a `ValidatedReconfigureMsg` ensures that + // `my_platform_id` is part of the new configuration and has a share. + // We can therefore safely unwrap here. + my_prepare_msg.unwrap() + } } /// What should the coordinator be doing? pub enum CoordinatorOperation { // We haven't started implementing this yet - #[expect(unused)] CollectShares { - epoch: Epoch, - members: BTreeMap, + last_committed_epoch: Epoch, + last_committed_members: BTreeMap, + last_committed_threshold: Threshold, + + // Shares collected from `last_committed_members` collected_shares: BTreeMap, + + // New shares to be used when we get to the `Prepare` operation new_shares: BTreeMap, }, // We haven't started implementing this yet // Epoch is always 0 - #[allow(unused)] CollectLrtqShares { - members: BTreeMap, - shares: BTreeMap, + last_committed_members: BTreeMap, + last_committed_threshold: Threshold, + collected_shares: BTreeMap, + + // New shares to be used when we get to the `Prepare` operation + new_shares: BTreeMap, }, Prepare { /// The set of Prepares to send to each node @@ -244,3 +557,31 @@ impl CoordinatorOperation { } } } + +/// A summary of the coordinator's current operational status +pub enum CoordinatorStatus { + CollectShares { collected_from: BTreeSet }, + CollectLrtqShares { collected_from: BTreeSet }, + Prepare { acked: BTreeSet }, +} + +impl From<&CoordinatorOperation> for CoordinatorStatus { + fn from(value: &CoordinatorOperation) -> Self { + match value { + CoordinatorOperation::CollectShares { + collected_shares, .. + } => CoordinatorStatus::CollectShares { + collected_from: collected_shares.keys().cloned().collect(), + }, + CoordinatorOperation::CollectLrtqShares { + collected_shares, + .. + } => CoordinatorStatus::CollectLrtqShares { + collected_from: collected_shares.keys().cloned().collect(), + }, + CoordinatorOperation::Prepare { prepare_acks, .. } => { + CoordinatorStatus::Prepare { acked: prepare_acks.clone() } + } + } + } +} diff --git a/trust-quorum/src/crypto.rs b/trust-quorum/src/crypto.rs index e5bb53cec59..c53ba8021a6 100644 --- a/trust-quorum/src/crypto.rs +++ b/trust-quorum/src/crypto.rs @@ -4,9 +4,13 @@ //! Various cryptographic constructs used by trust quroum. +use crate::{Epoch, Threshold}; use bootstore::trust_quorum::RackSecret as LrtqRackSecret; +use chacha20poly1305::{ChaCha20Poly1305, Key, KeyInit, aead, aead::Aead}; use derive_more::From; use gfss::shamir::{self, CombineError, SecretShares, Share, SplitError}; +use hkdf::Hkdf; +use omicron_uuid_kinds::{GenericUuid, RackUuid}; use rand::RngCore; use rand::rngs::OsRng; use secrecy::{DebugSecret, ExposeSecret, Secret}; @@ -15,9 +19,7 @@ use sha3::{Digest, Sha3_256}; use slog_error_chain::SlogInlineError; use std::fmt::Debug; use subtle::ConstantTimeEq; -use zeroize::{Zeroize, ZeroizeOnDrop}; - -use crate::Threshold; +use zeroize::{Zeroize, ZeroizeOnDrop, Zeroizing}; /// Each share contains a byte for the y-coordinate of 32 points on 32 different /// polynomials over Ed25519. All points share an x-coordinate, which is the 0th @@ -203,6 +205,15 @@ impl RackSecret { let secret = shamir::compute_secret(shares)?.try_into()?; Ok(secret) } + + pub fn reconstruct_from_iter<'a>( + shares: impl Iterator, + ) -> Result { + let mut shares: Vec = shares.cloned().collect(); + let res = RackSecret::reconstruct(&shares); + shares.zeroize(); + res + } } impl DebugSecret for RackSecret {} @@ -242,6 +253,63 @@ impl Default for Salt { } } +/// Encrypt the old rack secret with a key derived from the new rack secret. +/// +/// A random salt is generated and returned along with the encrypted secret. Key +/// derivation context includes `rack_id`, `old_epoch`, and `new_epoch`. +pub fn encrypt_old_rack_secret( + old_rack_secret: ReconstructedRackSecret, + new_rack_secret: ReconstructedRackSecret, + rack_id: RackUuid, + old_epoch: Epoch, + new_epoch: Epoch, +) -> aead::Result<(EncryptedRackSecret, Salt)> { + let salt = Salt::new(); + let cipher = derive_encryption_key_for_rack_secret( + new_rack_secret, + salt, + rack_id, + old_epoch, + new_epoch, + ); + + // This key is only used to encrypt one plaintext. A nonce of all zeroes is + // all that's required. + let nonce = [0u8; 12].into(); + let encrypted_rack_secret = EncryptedRackSecret( + cipher.encrypt(&nonce, old_rack_secret.expose_secret().as_ref())?, + ); + + Ok((encrypted_rack_secret, salt)) +} + +fn derive_encryption_key_for_rack_secret( + new_rack_secret: ReconstructedRackSecret, + salt: Salt, + rack_id: RackUuid, + old_epoch: Epoch, + new_epoch: Epoch, +) -> ChaCha20Poly1305 { + let prk = Hkdf::::new( + Some(&salt.0[..]), + new_rack_secret.expose_secret(), + ); + + // The "info" string is context to bind the key to its purpose + let mut key = Zeroizing::new([0u8; 32]); + prk.expand_multi_info( + &[ + b"trust-quorum-v1-rack-secret", + rack_id.as_untyped_uuid().as_ref(), + &new_epoch.0.to_be_bytes(), + &old_epoch.0.to_be_bytes(), + ], + key.as_mut(), + ) + .unwrap(); + ChaCha20Poly1305::new(Key::from_slice(key.as_ref())) +} + #[cfg(test)] mod tests { use super::*; diff --git a/trust-quorum/src/errors.rs b/trust-quorum/src/errors.rs new file mode 100644 index 00000000000..0a30fc81f15 --- /dev/null +++ b/trust-quorum/src/errors.rs @@ -0,0 +1,103 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Various errors for the trust quorum APIs + +use crate::configuration::ConfigurationError; +use crate::{Epoch, PlatformId, Threshold}; +use omicron_uuid_kinds::RackUuid; + +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +pub enum CommitError { + #[error("invalid rack id")] + InvalidRackId( + #[from] + #[source] + MismatchedRackIdError, + ), + + #[error("missing prepare msg")] + MissingPrepare, + + #[error("prepare for a later configuration exists")] + OutOfOrderCommit, +} + +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +#[error( + "sled was decommissioned on msg from {from:?} at epoch {epoch:?}: last prepared epoch = {last_prepared_epoch:?}" +)] +pub struct SledDecommissionedError { + pub from: PlatformId, + pub epoch: Epoch, + pub last_prepared_epoch: Option, +} + +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +#[error("mismatched rack id: expected {expected:?}, got {got:?}")] +pub struct MismatchedRackIdError { + pub expected: RackUuid, + pub got: RackUuid, +} + +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] +pub enum ReconfigurationError { + #[error("reconfiguration coordinator must be a member of the new group")] + CoordinatorMustBeAMemberOfNewGroup, + + #[error("upgrade from LRTQ required")] + UpgradeFromLrtqRequired, + + #[error( + "number of members: {num_members:?} must be greater than threshold: {threshold:?}" + )] + ThresholdMismatch { num_members: usize, threshold: Threshold }, + + #[error( + "invalid membership size: {0:?}: must be between 3 and 32 inclusive" + )] + InvalidMembershipSize(usize), + + #[error( + "invalid threshold: {0:?}: threshold must be between 2 and 31 inclusive" + )] + InvalidThreshold(Threshold), + + #[error( + "Node has last committed epoch of {node_epoch:?}, message contains {msg_epoch:?}" + )] + LastCommittedEpochMismatch { + node_epoch: Option, + msg_epoch: Option, + }, + + #[error( + "sled has already prepared a request at epoch {existing:?}, and cannot prepare another at a smaller or equivalent epoch {new:?}" + )] + PreparedEpochMismatch { existing: Epoch, new: Epoch }, + + #[error("invalid rack id in reconfigure msg")] + InvalidRackId( + #[from] + #[source] + MismatchedRackIdError, + ), + + #[error("cannot reconfigure a decommissioned sled")] + DecommissionedSled( + #[from] + #[source] + SledDecommissionedError, + ), + #[error( + "reconfiguration in progress at epoch {current_epoch:?}: cannot reconfigure for older epoch {msg_epoch:?}" + )] + ReconfigurationInProgress { current_epoch: Epoch, msg_epoch: Epoch }, + + #[error("mismatched reconfiguration requests for epoch {0:?}")] + MismatchedReconfigurationForSameEpoch(Epoch), + + #[error(transparent)] + Configuration(#[from] ConfigurationError), +} diff --git a/trust-quorum/src/lib.rs b/trust-quorum/src/lib.rs index b1dfe75d553..d1c72215980 100644 --- a/trust-quorum/src/lib.rs +++ b/trust-quorum/src/lib.rs @@ -15,6 +15,7 @@ use serde::{Deserialize, Serialize}; mod configuration; mod coordinator_state; pub(crate) mod crypto; +pub(crate) mod errors; mod messages; mod node; mod persistent_state; @@ -40,6 +41,13 @@ pub use persistent_state::{PersistentState, PersistentStateSummary}; )] pub struct Epoch(pub u64); +impl Epoch { + // Increment the epoch and return the new value + pub fn inc(&self) -> Epoch { + Epoch(self.0 + 1) + } +} + /// The number of shares required to reconstruct the rack secret /// /// Typically referred to as `k` in the docs diff --git a/trust-quorum/src/node.rs b/trust-quorum/src/node.rs index a65e5e858df..daf60d0f0db 100644 --- a/trust-quorum/src/node.rs +++ b/trust-quorum/src/node.rs @@ -4,12 +4,15 @@ //! A trust quorum node that implements the trust quorum protocol -use crate::validators::{ReconfigurationError, ValidatedReconfigureMsg}; +use crate::errors::{CommitError, MismatchedRackIdError, ReconfigurationError}; +use crate::validators::ValidatedReconfigureMsg; use crate::{ CoordinatorState, Envelope, Epoch, PersistentState, PlatformId, messages::*, }; +use gfss::shamir::Share; +use omicron_uuid_kinds::RackUuid; -use slog::{Logger, o, warn}; +use slog::{Logger, error, info, o, warn}; use std::time::Instant; /// An entity capable of participating in trust quorum @@ -76,6 +79,88 @@ impl Node { Ok(persistent_state) } + /// Commit the configuration for the given epoch + pub fn commit_reconfiguration( + &mut self, + epoch: Epoch, + rack_id: RackUuid, + ) -> Result, CommitError> { + if self.persistent_state.last_committed_epoch() == Some(epoch) { + // Idempotent request + return Ok(None); + } + + // Only commit if we have a `PrepareMsg` and it's the latest `PrepareMsg`. + // + // This forces a global ordering of `PrepareMsg`s, because it's only + // possible to re-derive a key share in a `PrepareMsg` for the current + // configuration. + // + // In practice this check will always succeed if we have the + // `PrepareMsg` for this epoch, because later `Prepare` messages would + // not have been accepted before this commit arrived. This is because + // each `PrepareMsg` contains the `last_committed_epoch` that must have + // been seen in order to be accepted. If this commit hadn't occurred, + // then it wasn't part of the chain of `last_committed_epoch`s, and was + // abandonded/canceled. In that case, if we ended up getting a commit, + // then it would not inductively have been part of the existing chain + // and so would be a bug in the protocol execution. Because of this we + // check and error on this condition. + let last_prepared_epoch = self.persistent_state.last_prepared_epoch(); + if last_prepared_epoch != Some(epoch) { + error!( + self.log, + "Commit message occurred out of order"; + "epoch" => %epoch, + "last_prepared_epoch" => ?last_prepared_epoch + ); + return Err(CommitError::OutOfOrderCommit); + } + if let Some(prepare) = self.persistent_state.prepares.get(&epoch) { + if prepare.config.rack_id != rack_id { + error!( + self.log, + "Commit attempted with invalid rack_id"; + "expected" => %prepare.config.rack_id, + "got" => %rack_id + ); + return Err(CommitError::InvalidRackId( + MismatchedRackIdError { + expected: prepare.config.rack_id, + got: rack_id, + }, + )); + } + } else { + // This is an erroneous commit attempt from nexus. Log it. + // + // Nexus should instead tell this node to retrieve a `Prepare` + // from another node that has already committed. + error!( + self.log, + "tried to commit a configuration, but missing prepare msg"; + "epoch" => %epoch + ); + return Err(CommitError::MissingPrepare); + } + + info!(self.log, "Committed configuration"; "epoch" => %epoch); + + // Are we currently coordinating for this epoch? + // Stop coordinating if we are + if self.coordinator_state.is_some() { + info!( + self.log, + "Stopping coordination due to commit"; + "epoch" => %epoch + ); + self.coordinator_state = None; + } + + self.persistent_state.commits.insert(epoch); + Ok(Some(self.persistent_state.clone())) + } + /// Process a timer tick /// /// Ticks are issued by the caller in order to move the protocol forward. @@ -87,8 +172,8 @@ impl Node { /// Handle a message from another node pub fn handle( &mut self, - _now: Instant, - _outbox: &mut Vec, + now: Instant, + outbox: &mut Vec, from: PlatformId, msg: PeerMsg, ) -> Option { @@ -97,6 +182,9 @@ impl Node { self.handle_prepare_ack(from, epoch); None } + PeerMsg::Share { epoch, share } => { + self.handle_share(now, outbox, from, epoch, share) + } _ => todo!( "cannot handle message variant yet - not implemented: {msg:?}" ), @@ -108,6 +196,7 @@ impl Node { self.coordinator_state.as_ref() } + // Handle receipt of a `PrepareAck` message fn handle_prepare_ack(&mut self, from: PlatformId, epoch: Epoch) { // Are we coordinating for this epoch? if let Some(cs) = &mut self.coordinator_state { @@ -133,6 +222,39 @@ impl Node { } } + // Handle receipt of a `Share` message + fn handle_share( + &mut self, + now: Instant, + outbox: &mut Vec, + from: PlatformId, + epoch: Epoch, + share: Share, + ) -> Option { + // Are we coordinating and expecting shares for `epoch`, which must + // be the last committed epoch? + if let Some(cs) = &mut self.coordinator_state { + if let Some(my_prepare_msg) = + cs.handle_share(now, outbox, from, epoch, share) + { + // Add the prepare to our `PersistentState` + self.persistent_state + .prepares + .insert(my_prepare_msg.config.epoch, my_prepare_msg); + + return Some(self.persistent_state.clone()); + } + } else { + warn!( + self.log, + "Received share when not coordinating"; + "from" => %from, + "epoch" => %epoch + ); + } + None + } + // Send any required messages as a reconfiguration coordinator fn send_coordinator_msgs( &mut self, @@ -175,7 +297,9 @@ impl Node { return Ok(Some(self.persistent_state.clone())); } - // We have a committed configuration that is not LRTQ + // Safety: We have a committed configuration that is not LRTQ + let our_share = + self.persistent_state.last_committed_key_share().unwrap(); let config = self.persistent_state.last_committed_configuration().unwrap(); @@ -184,6 +308,7 @@ impl Node { now, msg, &config, + our_share, )?); Ok(None) diff --git a/trust-quorum/src/persistent_state.rs b/trust-quorum/src/persistent_state.rs index 32aceebca4d..a3d1461454c 100644 --- a/trust-quorum/src/persistent_state.rs +++ b/trust-quorum/src/persistent_state.rs @@ -7,13 +7,13 @@ //! Note that this state is not necessarily directly serialized and saved. use crate::crypto::LrtqShare; -use crate::messages::{CommitMsg, PrepareMsg}; +use crate::messages::PrepareMsg; use crate::{Configuration, Epoch, PlatformId}; use bootstore::schemes::v0::SharePkgCommon as LrtqShareData; use gfss::shamir::Share; use omicron_uuid_kinds::{GenericUuid, RackUuid}; use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; /// All the persistent state for this protocol #[derive(Debug, Clone, Serialize, Deserialize, Default)] @@ -22,7 +22,7 @@ pub struct PersistentState { // data it read from disk. This allows us to upgrade from LRTQ. pub lrtq: Option, pub prepares: BTreeMap, - pub commits: BTreeMap, + pub commits: BTreeSet, // Has the node seen a commit for an epoch higher than it's current // configuration for which it has not received a `PrepareMsg` for? If at @@ -37,7 +37,7 @@ impl PersistentState { PersistentState { lrtq: None, prepares: BTreeMap::new(), - commits: BTreeMap::new(), + commits: BTreeSet::new(), decommissioned: None, } } @@ -66,7 +66,7 @@ impl PersistentState { } pub fn last_committed_epoch(&self) -> Option { - self.commits.keys().last().map(|epoch| *epoch) + self.commits.last().map(|epoch| *epoch) } // Get the configuration for the current epoch from its prepare message @@ -88,7 +88,7 @@ impl PersistentState { // Return the key share for the latest committed trust quorum configuration // if one exists - pub fn key_share(&self) -> Option { + pub fn last_committed_key_share(&self) -> Option { self.last_committed_epoch().map(|epoch| { self.prepares.get(&epoch).expect("missing prepare").share.clone() }) diff --git a/trust-quorum/src/validators.rs b/trust-quorum/src/validators.rs index 548377a7ff6..8ec4a5d5a55 100644 --- a/trust-quorum/src/validators.rs +++ b/trust-quorum/src/validators.rs @@ -4,7 +4,9 @@ //! Various validation functions to be used by a [`crate::Node`] -use crate::configuration::ConfigurationError; +use crate::errors::{ + MismatchedRackIdError, ReconfigurationError, SledDecommissionedError, +}; use crate::messages::ReconfigureMsg; use crate::{Epoch, PersistentStateSummary, PlatformId, Threshold}; use omicron_uuid_kinds::RackUuid; @@ -44,84 +46,6 @@ fn check_in_service( Ok(()) } -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] -#[error( - "sled was decommissioned on msg from {from:?} at epoch {epoch:?}: last prepared epoch = {last_prepared_epoch:?}" -)] -pub struct SledDecommissionedError { - from: PlatformId, - epoch: Epoch, - last_prepared_epoch: Option, -} - -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] -#[error("mismatched rack id: expected {expected:?}, got {got:?}")] -pub struct MismatchedRackIdError { - pub expected: RackUuid, - pub got: RackUuid, -} - -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] -pub enum ReconfigurationError { - #[error("reconfiguration coordinator must be a member of the new group")] - CoordinatorMustBeAMemberOfNewGroup, - - #[error("upgrade from LRTQ required")] - UpgradeFromLrtqRequired, - - #[error( - "number of members: {num_members:?} must be greater than threshold: {threshold:?}" - )] - ThresholdMismatch { num_members: usize, threshold: Threshold }, - - #[error( - "invalid membership size: {0:?}: must be between 3 and 32 inclusive" - )] - InvalidMembershipSize(usize), - - #[error( - "invalid threshold: {0:?}: threshold must be between 2 and 31 inclusive" - )] - InvalidThreshold(Threshold), - - #[error( - "Node has last committed epoch of {node_epoch:?}, message contains {msg_epoch:?}" - )] - LastCommittedEpochMismatch { - node_epoch: Option, - msg_epoch: Option, - }, - - #[error( - "sled has already prepared a request at epoch {existing:?}, and cannot prepare another at a smaller or equivalent epoch {new:?}" - )] - PreparedEpochMismatch { existing: Epoch, new: Epoch }, - - #[error("invalid rack id in reconfigure msg")] - InvalidRackId( - #[from] - #[source] - MismatchedRackIdError, - ), - - #[error("cannot reconfigure a decommissioned sled")] - DecommissionedSled( - #[from] - #[source] - SledDecommissionedError, - ), - #[error( - "reconfiguration in progress at epoch {current_epoch:?}: cannot reconfigure for older epoch {msg_epoch:?}" - )] - ReconfigurationInProgress { current_epoch: Epoch, msg_epoch: Epoch }, - - #[error("mismatched reconfiguration requests for epoch {0:?}")] - MismatchedReconfigurationForSameEpoch(Epoch), - - #[error(transparent)] - Configuration(#[from] ConfigurationError), -} - /// A `ReconfigureMsg` that has been determined to be valid for the remainder /// of code paths. We encode this check into a type in a "parse, don't validate" /// manner. diff --git a/trust-quorum/tests/coordinator.rs b/trust-quorum/tests/coordinator.rs index d9bb0237f8f..ff9467c6655 100644 --- a/trust-quorum/tests/coordinator.rs +++ b/trust-quorum/tests/coordinator.rs @@ -6,6 +6,7 @@ use assert_matches::assert_matches; use bcs::Result; +use gfss::shamir::Share; use omicron_test_utils::dev::test_setup_log; use omicron_uuid_kinds::RackUuid; use prop::sample::Index; @@ -49,6 +50,9 @@ impl Sut { /// the SUT node itself to understand external events like messages that got /// sent in response to a tick. pub struct Model { + // A copy of our id + pub id: PlatformId, + /// The current time pub now: Instant, @@ -60,8 +64,9 @@ pub struct Model { } impl Model { - pub fn new() -> Model { + pub fn new(coordinator_id: PlatformId) -> Model { Model { + id: coordinator_id, now: Instant::now(), last_committed_config_msg: None, coordinator_state: None, @@ -91,11 +96,9 @@ impl Model { // We don't currently collect LRTQ shares. This conditional // will have to be more specific when we do. let op = if self.last_committed_config_msg.is_some() { - ModelCoordinatorOp::CollectShares { - collected_from: BTreeSet::new(), - } + ModelCoordinatorOp::new_collect_shares(self.id.clone()) } else { - ModelCoordinatorOp::Prepare { acked: BTreeSet::new() } + ModelCoordinatorOp::new_prepare(self.id.clone()) }; self.coordinator_state = Some(ModelCoordinatorState { @@ -105,6 +108,11 @@ impl Model { }); } + pub fn commit(&mut self, msg: ReconfigureMsg) { + self.last_committed_config_msg = Some(msg); + self.coordinator_state = None; + } + // If we are currently preparing this epoch then record the ack pub fn ack_prepare(&mut self, from: PlatformId, epoch: Epoch) { if let Some(cs) = &mut self.coordinator_state { @@ -114,6 +122,47 @@ impl Model { } } + /// If we are currently waiting for shares in this epoch then record the + /// responder. + /// + /// If we've received enough shares to recompute the rack secret then + /// transition the model operation to `Prepare` and return `Ok(true)`. + pub fn handle_share( + &mut self, + from: PlatformId, + epoch: Epoch, + ) -> Result { + let Some(last_committed_config_msg) = &self.last_committed_config_msg + else { + return Ok(false); + }; + let Some(cs) = &mut self.coordinator_state else { + return Ok(false); + }; + + // Make sure our model state is consistent + prop_assert_eq!( + Some(last_committed_config_msg.epoch), + cs.msg.last_committed_epoch + ); + + // Record the share + if last_committed_config_msg.epoch == epoch { + cs.op.handle_share(from); + } + + // Do we have a threshold of shares? If so, we should transition our + // operation to `Prepare`. + if cs.op.collected_shares().unwrap_or(&BTreeSet::new()).len() + == last_committed_config_msg.threshold.0 as usize + { + cs.op = ModelCoordinatorOp::new_prepare(self.id.clone()); + return Ok(true); + } + + Ok(false) + } + /// Return nodes that have acked prepares for an ongoing coordination /// by the Model. /// @@ -122,6 +171,60 @@ impl Model { self.coordinator_state.as_ref().and_then(|cs| cs.op.acked_prepares()) } + /// Return the nodes that we are waiting for prepare acknowledgements from + pub fn waiting_for_prepare_acks_from( + &self, + ) -> Option> { + // We need to actually be coordinating to wait for shares + let Some(cs) = &self.coordinator_state else { + return None; + }; + // We need to actually be waiting for acks + let Some(acked_prepares) = self + .coordinator_state + .as_ref() + .and_then(|cs| cs.op.acked_prepares()) + else { + return None; + }; + + // Return all members of the current configuration that haven't acked a + // `PrepareMsg` yet. + Some(cs.msg.members.difference(acked_prepares).cloned().collect()) + } + + // Return the nodes that we are waiting for shares from + pub fn waiting_for_shares_from(&self) -> Option> { + // We only wait for shares if we have a prior committed config + let Some(last_committed_config_msg) = &self.last_committed_config_msg + else { + return None; + }; + // We need to actually be coordinating to wait for shares + let Some(cs) = &self.coordinator_state else { + return None; + }; + // We need to actually be collecting shares as a coordinator + let Some(collected_from) = cs.op.collected_shares() else { + return None; + }; + + // Return all members of the last committed configuration that we + // haven't received a share from yet. + Some( + last_committed_config_msg + .members + .difference(collected_from) + .cloned() + .collect(), + ) + } + + // Return the current epoch being coordinated, if there is one + fn coordinating_epoch(&self) -> Option { + self.coordinator_state.as_ref().map(|cs| cs.msg.epoch) + } + /// Return the members of the current reconfiguration being coordinated. /// /// Return the empty set if there is no ongoing coordination. @@ -149,6 +252,20 @@ pub enum ModelCoordinatorOp { } impl ModelCoordinatorOp { + pub fn new_collect_shares(ourself: PlatformId) -> Self { + ModelCoordinatorOp::CollectShares { + // Always include ourself + collected_from: BTreeSet::from([ourself]), + } + } + + pub fn new_prepare(ourself: PlatformId) -> Self { + ModelCoordinatorOp::Prepare { + // Always include ourself + acked: BTreeSet::from([ourself]), + } + } + pub fn ack_prepare(&mut self, from: PlatformId) { if let ModelCoordinatorOp::Prepare { acked } = self { acked.insert(from); @@ -162,6 +279,32 @@ impl ModelCoordinatorOp { None } } + + pub fn handle_share(&mut self, from: PlatformId) { + if let ModelCoordinatorOp::CollectShares { collected_from } = self { + collected_from.insert(from); + } + } + + pub fn collected_shares(&self) -> Option<&BTreeSet> { + if let ModelCoordinatorOp::CollectShares { collected_from } = self { + Some(collected_from) + } else { + None + } + } + + pub fn is_collecting_shares(&self) -> bool { + matches!(self, ModelCoordinatorOp::CollectShares { .. }) + } + + pub fn is_collecting_lrtq_shares(&self) -> bool { + matches!(self, ModelCoordinatorOp::CollectLrtqShares { .. }) + } + + pub fn is_preparing(&self) -> bool { + matches!(self, ModelCoordinatorOp::Prepare { .. }) + } } // The test itself maintains an internal state that includes not only the state @@ -169,37 +312,69 @@ impl ModelCoordinatorOp { // tests via `Action`s. struct TestState { /// Our system under test - pub sut: Sut, + sut: Sut, /// The abstract model of our SUT - pub model: Model, + model: Model, // All in flight messages to nodes that are not the SUT - pub network_msgs: BTreeMap>, + network_msgs: BTreeMap>, // Messages delivered to each node that is not the SUT - pub delivered_msgs: BTreeMap>, + delivered_msgs: BTreeMap>, + + // Shares delivered to nodes that are not the SUT. + // + // We track these so that we can implement replies for `GetShare` messages. + delivered_shares: BTreeMap<(Epoch, PlatformId), Share>, + + // A cache of our member universe, so we only have to generate it once + member_universe: Vec, + + // The last epoch for a `PrepareMsg` sent by Nexus + // In production this is stored in CRDB, but it is simulated here + highest_prepared_epoch: Epoch, + + // We reuse the same coordinator ID across all test runs + // There's no reason to randomize it. + coordinator_id: PlatformId, + + // No reason to change the rack_id + rack_id: RackUuid, } impl TestState { - pub fn new(log: Logger, coordinator_id: PlatformId) -> TestState { + pub fn new(log: Logger) -> TestState { + let member_universe = member_universe(); + let coordinator_id = member_universe[0].clone(); TestState { sut: Sut { - node: Node::new(log, coordinator_id, PersistentState::empty()), + node: Node::new( + log, + coordinator_id.clone(), + PersistentState::empty(), + ), persistent_state: PersistentState::empty(), }, - model: Model::new(), + model: Model::new(coordinator_id.clone()), network_msgs: BTreeMap::new(), delivered_msgs: BTreeMap::new(), + delivered_shares: BTreeMap::new(), + member_universe, + highest_prepared_epoch: Epoch(0), + coordinator_id, + rack_id: RackUuid::new_v4(), } } pub fn action_coordinate_reconfiguration( &mut self, - msg: ReconfigureMsg, + generated_config: GeneratedConfiguration, ) -> Result<(), TestCaseError> { let mut outbox = Vec::new(); + let msg = self.generated_config_to_reconfigure_msg(generated_config); + // Update the model state self.model.action_coordinate_reconfiguration(msg.clone()); @@ -233,9 +408,15 @@ impl TestState { self.send(outbox.into_iter()); } None => { - // The request is idempotent - // No action should have been taken - prop_assert!(outbox.is_empty()); + // This is a reconfiguration, so before a `PersistentState` + // is returned we must collect share for the last committed + // configuration. + self.assert_envelopes_after_coordinate_reconfiguration( + &outbox, + )?; + + // We validated our messages. Let's put them into our test state as "in-flight". + self.send(outbox.into_iter()); } } @@ -252,12 +433,21 @@ impl TestState { Action::DeliverMsgs(indices) => { self.action_deliver_msgs(indices); } + Action::DropMessages(indices) => { + self.action_drop_msgs(indices); + } Action::Reply(indices) => { self.action_reply(indices)?; } Action::Tick(time_jump) => { self.action_tick(time_jump)?; } + Action::Commit(index) => { + self.action_commit(index)?; + } + Action::CoordinateReconfiguration(generated_config) => { + self.action_coordinate_reconfiguration(generated_config)?; + } } } Ok(()) @@ -273,6 +463,14 @@ impl TestState { for index in indices { let id = index.get(&destinations); if let Some(msg) = self.network_msgs.get_mut(id).unwrap().pop() { + // If the message is a `Prepare`, also save its share so we can + // reply to `GetShare` messages. + if let PeerMsg::Prepare(prepare) = &msg { + self.delivered_shares.insert( + (prepare.config.epoch, id.clone()), + prepare.share.clone(), + ); + } let msgs = self.delivered_msgs.entry(id.clone()).or_default(); msgs.push(msg); } @@ -282,6 +480,22 @@ impl TestState { self.network_msgs.retain(|_, msgs| !msgs.is_empty()); } + // Drop network messages destined for generated destinations + fn action_drop_msgs(&mut self, indices: Vec) { + let destinations: Vec<_> = self.network_msgs.keys().cloned().collect(); + if destinations.is_empty() { + // nothing to do + return; + } + for index in indices { + let id = index.get(&destinations); + let _ = self.network_msgs.get_mut(id).unwrap().pop(); + } + + // Remove any destinations with zero messages in-flight + self.network_msgs.retain(|_, msgs| !msgs.is_empty()); + } + /// Send replies from nodes with delivered messages fn action_reply( &mut self, @@ -303,6 +517,9 @@ impl TestState { PeerMsg::Prepare(prepare_msg) => { self.reply_to_prepare_msg(from.clone(), prepare_msg)?; } + PeerMsg::GetShare(epoch) => { + self.reply_to_get_share_msg(from.clone(), epoch)?; + } _ => todo!(), } } @@ -327,35 +544,142 @@ impl TestState { // If time has advanced past the coordinator's retry deadline // then we must see if we expected any retries to be sent. if timer_expired { - // Get the members of the current configuration being coordinated - let members = self.model.active_reconfiguration_members(); + self.assert_expected_outgoing_envelopes(&outbox)?; + } + + // Put any output messages onto the network + self.send(outbox.into_iter()); - // We aren't coordinating - if members.is_empty() { - prop_assert!(outbox.is_empty()); - return Ok(()); + Ok(()) + } + + fn assert_expected_outgoing_envelopes( + &self, + outbox: &[Envelope], + ) -> Result<(), TestCaseError> { + if let Some(expected) = self.model.waiting_for_prepare_acks_from() { + for envelope in outbox { + assert_matches!(envelope.msg, PeerMsg::Prepare(_)); + prop_assert!(expected.contains(&envelope.to)); } + // We are only sending either `Prepare` or `GetShare` variants, not both. + return Ok(()); + } - if let Some(acked_members) = self.model.acked_prepares() { - // We expect retries for all members that the coordinator - // has not received acks for. - let expected: BTreeSet<_> = - members.difference(acked_members).collect(); - for envelope in &outbox { - prop_assert!(expected.contains(&envelope.to)); - } - } else { - // We aren't waiting on acks, so won't retry sending prepares - prop_assert!(outbox.is_empty()); + if let Some(expected) = self.model.waiting_for_shares_from() { + for envelope in outbox { + assert_matches!(envelope.msg, PeerMsg::GetShare(_)); + prop_assert!(expected.contains(&envelope.to)); } } - // Put any output messages onto the network - self.send(outbox.into_iter()); + Ok(()) + } + + // Precondition: Nexus will only actually commit if enough nodes + // have acked the `PrepareMsg` from the coordinator. Let's simulate + // that by checking our model state. + // + // Return `Some(reconfigure_msg)` if Nexus would decide to commit, `None` otherwise. + fn action_commit_precondition( + &mut self, + index: Index, + ) -> Option { + // If we aren't currently coordinating, then just return as there's no + // configuration to commit. + let Some(cs) = &self.model.coordinator_state else { + return None; + }; + + // Compute the safety threshold used by nexus + // Nexus will choose a constant, but we vary it because we can. + let max_possible_z = cs.msg.members.len() - cs.msg.threshold.0 as usize; + let z = + if max_possible_z == 0 { 0 } else { index.index(max_possible_z) }; + + // We can only commit when the coordinator has seen a `PrepareAck` from + // K + Z nodes. + // + // Justification: The coordinator does not know what `Z` is and cannot + // know if it has seen enough acks to know if it's safe to commit, so + // nexus must not try. Furthermore, Nexus informs non-coordinator nodes + // when to commit and they have no way to know if enough nodes have + // shares as a result of receiving `PrepareMsg`s. Because of this, we + // rely on Nexus to reliably uphold this correctness invariant, and + // simulate it in the test. + let acked = cs.op.acked_prepares().map(|s| s.len()).unwrap_or(0); + if acked >= cs.msg.threshold.0 as usize + z { + Some(cs.msg.clone()) + } else { + None + } + } + + fn action_commit(&mut self, index: Index) -> Result<(), TestCaseError> { + let Some(msg) = self.action_commit_precondition(index) else { + // We failed our precondition check. This isn't a test failure. It + // just means that the coordinator is not ready to commit. + return Ok(()); + }; + + // Commit the configuration at our node + let persistent_state = self + .sut + .node + .commit_reconfiguration(msg.epoch, msg.rack_id)? + .expect("persistent state has been updated"); + + // Check the output of the persistent state + self.assert_persistent_state_after_commit_reconfiguration( + msg.epoch, + &persistent_state, + )?; + + // Save the persistent state + self.sut.persistent_state = persistent_state; + + // Update our model state + self.model.commit(msg); Ok(()) } + fn generated_config_to_reconfigure_msg( + &mut self, + config: GeneratedConfiguration, + ) -> ReconfigureMsg { + let mut members: BTreeSet = config + .members + .iter() + .map(|index| self.member_universe[*index].clone()) + .collect(); + + // Ensure that the configuration always includes the coordinator + members.insert(self.coordinator_id.clone()); + + let threshold = Threshold(usize::max( + 2, + config.threshold.index(members.len()), + ) as u8); + + // Increment the epoch for this configuration. This simulates what Nexus + // would do. + self.highest_prepared_epoch = self.highest_prepared_epoch.inc(); + + ReconfigureMsg { + rack_id: self.rack_id, + epoch: self.highest_prepared_epoch, + last_committed_epoch: self + .model + .last_committed_config_msg + .as_ref() + .map(|msg| msg.epoch), + members, + threshold, + retry_timeout: Duration::from_millis(RETRY_TIMEOUT_MS), + } + } + fn reply_to_prepare_msg( &mut self, from: PlatformId, @@ -384,10 +708,87 @@ impl TestState { // Ensure that if the SUT is waiting for prepares, that the ack // is accounted for. - if let Some(acks) = self.model.acked_prepares() { - prop_assert!(acks.contains(&from)); + // + // We only perform this check if this reply is for the current epoch + if self.model.coordinating_epoch() == Some(msg.config.epoch) { + if let Some(acks) = self.model.acked_prepares() { + prop_assert!(acks.contains(&from)); + } + } + + Ok(()) + } + + fn reply_to_get_share_msg( + &mut self, + from: PlatformId, + epoch: Epoch, + ) -> Result<(), TestCaseError> { + // If we have a share, then return it. Otherwise return nothing. + // In the future we may implement other behaviors. + let Some(share) = self.delivered_shares.get(&(epoch, from.clone())) + else { + return Ok(()); + }; + let reply = PeerMsg::Share { epoch, share: share.clone() }; + let mut outbox = Vec::new(); + let output = self.sut.node.handle( + self.model.now, + &mut outbox, + from.clone(), + reply, + ); + + // If we just received a threshold number of shares, we expect + // reconstruction of the rack secret for the last committed + // configuration and the SUT to start sending prepare messages. As part + // of this transition the persistent state with the SUT's own latest + // `PrepareMsg` is also returned. + let start_preparing = self.model.handle_share(from.clone(), epoch)?; + if start_preparing { + // We just transitioned to preparing + let Some(persistent_state) = output else { + return Err(TestCaseError::fail( + "Persistent state should exist", + )); + }; + + // The same checks should hold. + // TODO: Perhaps we should rename this method. + self.assert_persistent_state_after_coordinate_reconfiguration( + &persistent_state, + )?; + + // We validated our persistent state is correct. Save it and move + // on. + self.sut.persistent_state = persistent_state; } + self.assert_expected_outgoing_envelopes(&outbox)?; + + self.send(outbox.into_iter()); + + Ok(()) + } + + /// Ensure that the output of `Node::commit_reconfiguration` is valid + /// given the test state. + fn assert_persistent_state_after_commit_reconfiguration( + &self, + epoch: Epoch, + persistent_state: &PersistentState, + ) -> Result<(), TestCaseError> { + // We should have one new commit for epoch: Epoch + prop_assert_eq!( + self.sut.persistent_state.commits.len() + 1, + persistent_state.commits.len() + ); + assert!(!self.sut.persistent_state.commits.contains(&epoch)); + assert!(persistent_state.commits.contains(&epoch)); + + // The SUT node should no longer be coordinating + assert!(self.sut.node.get_coordinator_state().is_none()); + Ok(()) } @@ -409,6 +810,7 @@ impl TestState { ))? .msg; + // TODO: This will have to change once we start supporting LRTQ prop_assert!(persistent_state.lrtq.is_none()); prop_assert_eq!( &sut.persistent_state.commits, @@ -447,37 +849,58 @@ impl TestState { outbox: &[Envelope], ) -> Result<(), TestCaseError> { let sut = &self.sut; - let msg = &self - .model - .coordinator_state - .as_ref() - .ok_or(TestCaseError::fail( - "Model should have a coordinator state", - ))? - .msg; - - let config = sut.persistent_state.configuration(msg.epoch).unwrap(); - - // Ensure the members of the configuration match the model msg - prop_assert_eq!( - &msg.members, - &config.members.keys().cloned().collect() - ); + let model_cs = &self.model.coordinator_state.as_ref().ok_or( + TestCaseError::fail("Model should have a coordinator state"), + )?; // The coordinator should send messages to every node but itself - assert_eq!(outbox.len(), config.members.len() - 1); for envelope in outbox { - assert_matches!( - &envelope.msg, - PeerMsg::Prepare(PrepareMsg { config: prepare_config, .. }) => { - assert_eq!(*config, *prepare_config); - } - ); - prop_assert_eq!(&envelope.from, &config.coordinator); + if model_cs.op.is_preparing() { + // We should be sending to every member of the current config + // except ourself. + assert_eq!(outbox.len(), model_cs.msg.members.len() - 1); + + // Safety: We'll always have this configuration in the SUT persistent + // state if we are preparing. + let config = sut + .persistent_state + .configuration(model_cs.msg.epoch) + .expect("config for this epoch: {epoch}"); + + assert_matches!( + &envelope.msg, + PeerMsg::Prepare(PrepareMsg { config: prepare_config, .. }) => { + assert_eq!(*config, *prepare_config); + } + ); + } else { + // We should be sending to every member of the previous config + // except ourself. + // + // Safety: If we got here, then we have a prior committed + // configuration. + assert_eq!( + outbox.len(), + self.model + .last_committed_config_msg + .as_ref() + .unwrap() + .members + .len() + - 1 + ); + assert_matches!( + &envelope.msg, + PeerMsg::GetShare(epoch) => { + assert_eq!(*epoch, model_cs.msg.last_committed_epoch.unwrap()); + } + ); + } + prop_assert_eq!(&envelope.from, &self.coordinator_id); // We don't send a prepare to ourselves. We put it in our // `PersistentState` directly. - prop_assert_ne!(&envelope.to, &config.coordinator); + prop_assert_ne!(&envelope.to, &self.coordinator_id); } Ok(()) @@ -514,7 +937,7 @@ pub enum Action { /// use a Vec. We don't worry about shuffling messsages so they are out of /// order, because we use stream oriented transport. //#[weight(3)] - //DropMessages(Vec), + DropMessages(Vec), /// Fake a reply for a delivered message. /// @@ -522,12 +945,13 @@ pub enum Action { /// `test_state.delivered_msgs`. #[weight(45)] Reply(Vec), + // Start a reconfiguration that has a possibility of succeeding // given delivered messages. // // It will never fail a validation check. //#[weight(2)] - //StartReconfiguration(GeneratedConfiguration), + CoordinateReconfiguration(GeneratedConfiguration), // A tick duration in milliseconds #[weight(50)] @@ -538,6 +962,17 @@ pub enum Action { )] Duration, ), + + // Attempt to Commit a reconfiguration if possible + // + // We'll only attempt to commit a reconfiguration if K+Z nodes + // have acked prepares for that epoch. + // + // Z is randomly chosen between 0 and N-K, and is a "safety parameter" + // described in RFD 238. + // + // `Index` is used to compute Z here. + Commit(Index), } /// Informnation about configurations used at test generation time @@ -552,7 +987,13 @@ pub struct GeneratedConfiguration { /// still be duplicated due to the shift implementation used. Therefore we /// instead just choose from a constrained set of usize values that we can /// use directly as indexes into our fixed size structure for all tests. - #[strategy(btree_set(0..=255usize, MIN_CLUSTER_SIZE..=MAX_CLUSTER_SIZE))] + /// + /// Note that we intentionally set the max set size to MAX_CLUSTER_SIZE-1. + /// This is because we always want to include the coordinator in the + /// configuration, but it's value may not be chosen randomly. In this case, + /// we have to add it to the actual membership set we generate from this + /// configuration with [`TestState::generated_config_to_reconfigure_msg`]. + #[strategy(btree_set(0..=255usize, MIN_CLUSTER_SIZE..MAX_CLUSTER_SIZE))] pub members: BTreeSet, /// An index is roughly equivalent to a threshold, since a threshold cannot @@ -584,60 +1025,13 @@ pub struct TestInput { fn test_coordinator_behavior_from_empty_state(input: TestInput) { let logctx = test_setup_log("coordinator_behavior_from_empty_state"); - let universe = member_universe(); - let rack_id = RackUuid::new_v4(); - - // We don't know the coordinator ID until we translate the indexes in the - // test input to `PlatformId`s. - let coordinator_id = None; - let initial_config_msg = generated_config_to_reconfigure_msg( - &universe, - coordinator_id, - Epoch(1), - rack_id, - input.initial_config, - ); - - let coordinator_id = initial_config_msg.members.first().unwrap().clone(); - let mut state = TestState::new(logctx.log.clone(), coordinator_id); + let mut state = TestState::new(logctx.log.clone()); // Start the initial configuration - state.action_coordinate_reconfiguration(initial_config_msg)?; + state.action_coordinate_reconfiguration(input.initial_config)?; // Start executing the actions state.run_actions(input.actions)?; logctx.cleanup_successful(); } - -fn generated_config_to_reconfigure_msg( - member_universe: &[PlatformId], - coordinator_id: Option<&PlatformId>, - epoch: Epoch, - rack_id: RackUuid, - config: GeneratedConfiguration, -) -> ReconfigureMsg { - let mut members: BTreeSet = config - .members - .iter() - .map(|index| member_universe[*index].clone()) - .collect(); - - if let Some(coordinator_id) = coordinator_id { - members.insert(coordinator_id.clone()); - } - - println!("members = {members:#?}"); - - let threshold = - Threshold(usize::max(2, config.threshold.index(members.len())) as u8); - - ReconfigureMsg { - rack_id, - epoch, - last_committed_epoch: None, - members, - threshold, - retry_timeout: Duration::from_millis(RETRY_TIMEOUT_MS), - } -}