diff --git a/sled-agent/config-reconciler/src/handle.rs b/sled-agent/config-reconciler/src/handle.rs index 9a29598131..cdae6a14da 100644 --- a/sled-agent/config-reconciler/src/handle.rs +++ b/sled-agent/config-reconciler/src/handle.rs @@ -243,7 +243,7 @@ impl ConfigReconcilerHandle { } /// Wait for the internal disks task to start managing the boot disk. - pub async fn wait_for_boot_disk(&mut self) -> DiskIdentity { + pub async fn wait_for_boot_disk(&mut self) -> Arc { self.internal_disks_rx.wait_for_boot_disk().await } diff --git a/sled-agent/config-reconciler/src/internal_disks.rs b/sled-agent/config-reconciler/src/internal_disks.rs index 729f5ac337..2d6ccdc7f5 100644 --- a/sled-agent/config-reconciler/src/internal_disks.rs +++ b/sled-agent/config-reconciler/src/internal_disks.rs @@ -11,9 +11,14 @@ use camino::Utf8PathBuf; use core::cmp; +use futures::future; +use futures::future::Either; use id_map::IdMap; use id_map::IdMappable; +use omicron_common::backoff::Backoff as _; +use omicron_common::backoff::ExponentialBackoffBuilder; use omicron_common::disk::DiskIdentity; +use omicron_common::disk::DiskVariant; use omicron_common::disk::M2Slot; use omicron_common::zpool_name::ZpoolName; use sled_hardware::PooledDiskError; @@ -23,25 +28,36 @@ use sled_storage::dataset::CONFIG_DATASET; use sled_storage::dataset::M2_ARTIFACT_DATASET; use sled_storage::dataset::M2_DEBUG_DATASET; use sled_storage::disk::Disk; +use sled_storage::disk::DiskError; +use sled_storage::disk::RawDisk; use slog::Logger; +use slog::error; +use slog::info; +use slog::warn; +use slog_error_chain::InlineErrorChain; +use std::collections::BTreeMap; +use std::collections::BTreeSet; +use std::future::Future; use std::mem; +use std::ops::Deref; use std::sync::Arc; +use std::time::Duration; use tokio::sync::watch; use tokio::sync::watch::error::RecvError; -use crate::dump_setup::DumpSetup; use crate::raw_disks::RawDiskWithId; /// A thin wrapper around a [`watch::Receiver`] that presents a similar API. #[derive(Debug, Clone)] pub struct InternalDisksReceiver { mount_config: Arc, + errors_rx: watch::Receiver>>, inner: InternalDisksReceiverInner, } #[derive(Debug, Clone)] enum InternalDisksReceiverInner { - Real(watch::Receiver>>), + Real(watch::Receiver>), #[cfg(any(test, feature = "testing"))] FakeStatic(Arc>), #[cfg(any(test, feature = "testing"))] @@ -63,7 +79,15 @@ impl InternalDisksReceiver { }) .collect(), )); - Self { mount_config, inner } + + // We never report errors from our static set; move the sender to a task + // that idles so we don't get recv errors. + let (errors_tx, errors_rx) = watch::channel(Arc::default()); + tokio::spawn(async move { + errors_tx.closed().await; + }); + + Self { mount_config, inner, errors_rx } } /// Create an `InternalDisksReceiver` that forwards any changes made in the @@ -87,6 +111,7 @@ impl InternalDisksReceiver { .collect(); let (mapped_tx, mapped_rx) = watch::channel(Arc::new(current)); + // Spawn the task that forwards changes from channel to channel. tokio::spawn(async move { while let Ok(()) = disks_rx.changed().await { let remapped = disks_rx @@ -103,8 +128,15 @@ impl InternalDisksReceiver { } }); + // We never report errors from our static set; move the sender to a task + // that idles so we don't get recv errors. + let (errors_tx, errors_rx) = watch::channel(Arc::default()); + tokio::spawn(async move { + errors_tx.closed().await; + }); + let inner = InternalDisksReceiverInner::FakeDynamic(mapped_rx); - Self { mount_config, inner } + Self { mount_config, inner, errors_rx } } pub(crate) fn spawn_internal_disks_task( @@ -112,20 +144,39 @@ impl InternalDisksReceiver { raw_disks_rx: watch::Receiver>, base_log: &Logger, ) -> Self { - let (disks_tx, disks_rx) = watch::channel(Arc::default()); + Self::spawn_with_disk_adopter( + mount_config, + raw_disks_rx, + base_log, + RealDiskAdopter, + ) + } + + fn spawn_with_disk_adopter( + mount_config: Arc, + raw_disks_rx: watch::Receiver>, + base_log: &Logger, + disk_adopter: T, + ) -> Self { + let (disks_tx, disks_rx) = watch::channel(IdMap::default()); + let (errors_tx, errors_rx) = watch::channel(Arc::default()); tokio::spawn( InternalDisksTask { disks_tx, + errors_tx, raw_disks_rx, mount_config: Arc::clone(&mount_config), - dump_setup: DumpSetup::new(base_log, Arc::clone(&mount_config)), log: base_log.new(slog::o!("component" => "InternalDisksTask")), } - .run(), + .run(disk_adopter), ); - Self { mount_config, inner: InternalDisksReceiverInner::Real(disks_rx) } + Self { + mount_config, + inner: InternalDisksReceiverInner::Real(disks_rx), + errors_rx, + } } /// Get the current set of managed internal disks without marking the @@ -135,7 +186,9 @@ impl InternalDisksReceiver { /// owned value that does not keep the internal watch lock held. pub fn current(&self) -> InternalDisks { let disks = match &self.inner { - InternalDisksReceiverInner::Real(rx) => Arc::clone(&*rx.borrow()), + InternalDisksReceiverInner::Real(rx) => { + Arc::new(rx.borrow().iter().map(From::from).collect()) + } #[cfg(any(test, feature = "testing"))] InternalDisksReceiverInner::FakeStatic(disks) => Arc::clone(disks), #[cfg(any(test, feature = "testing"))] @@ -153,9 +206,9 @@ impl InternalDisksReceiver { /// returns an owned value that does not keep the internal watch lock held. pub fn current_and_update(&mut self) -> InternalDisks { let disks = match &mut self.inner { - InternalDisksReceiverInner::Real(rx) => { - Arc::clone(&*rx.borrow_and_update()) - } + InternalDisksReceiverInner::Real(rx) => Arc::new( + rx.borrow_and_update().iter().map(From::from).collect(), + ), #[cfg(any(test, feature = "testing"))] InternalDisksReceiverInner::FakeStatic(disks) => Arc::clone(disks), #[cfg(any(test, feature = "testing"))] @@ -166,6 +219,27 @@ impl InternalDisksReceiver { InternalDisks { disks, mount_config: Arc::clone(&self.mount_config) } } + /// Get the raw set of `Disk`s. + /// + /// This operation will panic if this receiver is backed by a fake set of + /// disk properties (e.g., as created by `Self::fake_static()`). It is also + /// only exposed to this crate; external callers should only operate on the + /// view provided by `InternalDisks`. Internal-to-this-crate callers should + /// take care not to hold the returned reference too long (as this keeps the + /// watch channel locked). + pub(crate) fn borrow_and_update_raw_disks( + &mut self, + ) -> watch::Ref<'_, IdMap> { + match &mut self.inner { + InternalDisksReceiverInner::Real(rx) => rx.borrow_and_update(), + #[cfg(any(test, feature = "testing"))] + InternalDisksReceiverInner::FakeStatic(_) + | InternalDisksReceiverInner::FakeDynamic(_) => panic!( + "borrow_and_update_raw_disks not supported by fake impls" + ), + } + } + /// Wait for changes to the set of managed internal disks. pub async fn changed(&mut self) -> Result<(), RecvError> { match &mut self.inner { @@ -183,29 +257,51 @@ impl InternalDisksReceiver { /// Wait until the boot disk is managed, returning its identity. /// /// Internally updates the most-recently-seen value. - pub(crate) async fn wait_for_boot_disk(&mut self) -> DiskIdentity { - let disks_rx = match &mut self.inner { - InternalDisksReceiverInner::Real(rx) => rx, + pub(crate) async fn wait_for_boot_disk(&mut self) -> Arc { + match &mut self.inner { + InternalDisksReceiverInner::Real(disks_rx) => loop { + let disks = disks_rx.borrow_and_update(); + if let Some(disk) = disks.iter().find(|d| d.is_boot_disk()) { + return Arc::clone(&disk.identity); + } + mem::drop(disks); + + disks_rx + .changed() + .await + .expect("InternalDisks task never dies"); + }, #[cfg(any(test, feature = "testing"))] InternalDisksReceiverInner::FakeStatic(disks) => { - if let Some(disk) = disks.iter().find(|d| d.is_boot_disk) { - return (*disk.identity).clone(); + if let Some(disk) = disks.iter().find(|d| d.is_boot_disk()) { + return Arc::clone(&disk.id.identity); } panic!("fake InternalDisksReceiver has no boot disk") } #[cfg(any(test, feature = "testing"))] - InternalDisksReceiverInner::FakeDynamic(rx) => rx, - }; - loop { - let disks = disks_rx.borrow_and_update(); - if let Some(disk) = disks.iter().find(|d| d.is_boot_disk) { - return (*disk.identity).clone(); - } - mem::drop(disks); + InternalDisksReceiverInner::FakeDynamic(disks_rx) => loop { + let disks = disks_rx.borrow_and_update(); + if let Some(disk) = disks.iter().find(|d| d.is_boot_disk()) { + return Arc::clone(&disk.id.identity); + } + mem::drop(disks); - disks_rx.changed().await.expect("InternalDisks task never dies"); + disks_rx + .changed() + .await + .expect("InternalDisks task never dies"); + }, } } + + /// Return the most recent set of internal disk reconciliation errors. + /// + /// Note that this error set is not atomically collected with the + /// `current()` set of disks. It is only useful for inventory reporting + /// purposes. + pub(crate) fn errors(&self) -> Arc> { + Arc::clone(&*self.errors_rx.borrow()) + } } pub struct InternalDisks { @@ -220,7 +316,7 @@ impl InternalDisks { pub fn boot_disk_zpool(&self) -> Option<&ZpoolName> { self.disks.iter().find_map(|d| { - if d.is_boot_disk { Some(&d.zpool_name) } else { None } + if d.is_boot_disk() { Some(&d.zpool_name) } else { None } }) } @@ -290,12 +386,13 @@ impl InternalDisks { } } -// A summary of a [`Disk`] without providing any of the associated functionality. -#[derive(Debug)] +// A subset of `Disk` properties. We store this in `InternalDisks` instead of +// `Disk`s both to avoid exposing raw `Disk`s outside this crate and to support +// easier faking for tests. +#[derive(Debug, Clone)] struct InternalDiskDetails { - identity: Arc, + id: InternalDiskDetailsId, zpool_name: ZpoolName, - is_boot_disk: bool, // These two fields are optional because they don't exist for synthetic // disks. @@ -307,10 +404,7 @@ impl IdMappable for InternalDiskDetails { type Id = InternalDiskDetailsId; fn id(&self) -> Self::Id { - InternalDiskDetailsId { - identity: Arc::clone(&self.identity), - is_boot_disk: self.is_boot_disk, - } + self.id.clone() } } @@ -332,28 +426,42 @@ impl From<&'_ Disk> for InternalDiskDetails { }; Self { - identity: Arc::new(disk.identity().clone()), + id: InternalDiskDetailsId { + identity: Arc::new(disk.identity().clone()), + is_boot_disk: disk.is_boot_disk(), + }, zpool_name: *disk.zpool_name(), - is_boot_disk: disk.is_boot_disk(), slot, raw_devfs_path, } } } +impl From<&'_ InternalDisk> for InternalDiskDetails { + fn from(disk: &'_ InternalDisk) -> Self { + Self::from(&disk.disk) + } +} + impl InternalDiskDetails { #[cfg(any(test, feature = "testing"))] fn fake_details(identity: DiskIdentity, zpool_name: ZpoolName) -> Self { + // We can expand the interface for fake disks if we need to be able to + // specify more of these properties in future tests. Self { - identity: Arc::new(identity), + id: InternalDiskDetailsId { + identity: Arc::new(identity), + is_boot_disk: false, + }, zpool_name, - // We can expand the interface for fake disks if we need to - // be able to specify these fields in future tests. - is_boot_disk: false, slot: None, raw_devfs_path: None, } } + + fn is_boot_disk(&self) -> bool { + self.id.is_boot_disk + } } // Special ID type for `InternalDiskDetails` that lets us guarantee we sort boot @@ -386,28 +494,332 @@ impl cmp::PartialOrd for InternalDiskDetailsId { } } +// Wrapper around a `Disk` with a cheaply-cloneable identity. +#[derive(Debug, Clone)] +pub(crate) struct InternalDisk { + identity: Arc, + disk: Disk, +} + +impl From for InternalDisk { + fn from(disk: Disk) -> Self { + // Invariant: We should only be constructed for internal disks; our + // caller should have already filtered out all external disks. + match disk.variant() { + DiskVariant::M2 => (), + DiskVariant::U2 => panic!("InternalDisk called with external Disk"), + } + Self { identity: Arc::new(disk.identity().clone()), disk } + } +} + +impl Deref for InternalDisk { + type Target = Disk; + + fn deref(&self) -> &Self::Target { + &self.disk + } +} + +impl IdMappable for InternalDisk { + type Id = Arc; + + fn id(&self) -> Self::Id { + Arc::clone(&self.identity) + } +} + struct InternalDisksTask { - disks_tx: watch::Sender>>, + // Input channel for changes to the raw disks sled-agent sees. raw_disks_rx: watch::Receiver>, - mount_config: Arc, - // Invokes dumpadm(8) and savecore(8) when new disks are encountered - dump_setup: DumpSetup, + // The set of disks we've successfully started managing. + disks_tx: watch::Sender>, + // Output channel summarizing any adoption errors. + // + // Because this is a different channel from `disks_tx`, it isn't possible + // for a caller to get an atomic view of "the current disks and errors", so + // it's possible a disk could appear in both or neither if snapshots are + // taken at just the wrong time. This isn't great, but is maybe better than + // squishing both into a single channel. If we combined them, then we'd wake + // up any receivers that really only care about disk changes any time we had + // changes to errors. (And if we get an error, we'll retry; if it continues + // to fail, we'd continue to wake up receivers after each retry, because we + // don't have an easy way of checking whether errors are equal.) + // + // The errors are only reported in inventory, which already has similar + // non-atomic properties across other fields all reported together. + errors_tx: watch::Sender>>, + + mount_config: Arc, log: Logger, } impl InternalDisksTask { - async fn run(self) { - unimplemented!() + async fn run(mut self, disk_adopter: T) { + // If disk adoption fails, the most likely cause is that the disk is not + // formatted correctly, and we have no automated means to recover that. + // However, it's always possible we could fail to adopt due to some + // transient error. Construct an exponential backoff that scales up to + // waiting a minute between attempts; that should let us handle any + // short transient errors without constantly retrying a doomed + // operation. + // + // We could be smarter here and check for retryable vs non-retryable + // adoption errors. + let mut next_backoff = ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_secs(1)) + .with_max_interval(Duration::from_secs(60)) + .with_max_elapsed_time(None) + .build(); + + loop { + // Collect the internal disks to avoid holding the watch channel + // lock while we attempt to adopt any new disks. + let internal_raw_disks = self + .raw_disks_rx + .borrow_and_update() + .iter() + .filter(|disk| match disk.variant() { + DiskVariant::U2 => false, + DiskVariant::M2 => true, + }) + .cloned() + .collect::>(); + + // Perform actual reconciliation, updating our output watch + // channels. + self.reconcile_internal_disks(internal_raw_disks, &disk_adopter) + .await; + + // If any adoption attempt failed, we'll retry; otherwise we'll wait + // for a change in `raw_disks_rx`. + let retry_timeout = if self.errors_tx.borrow().is_empty() { + next_backoff.reset(); + Either::Left(future::pending()) + } else { + let timeout = next_backoff + .next_backoff() + .expect("backoff configured with no max elapsed time"); + info!( + self.log, + "Will retry failed disk adoption after {:?}", timeout + ); + Either::Right(tokio::time::sleep(timeout)) + }; + + // Wait until either we need to retry (if the above adoption failed) + // or there's a change in the raw disks. + tokio::select! { + // Cancel-safe: This is either `pending()` (never ready) or + // `sleep()` (cancel-safe). + _ = retry_timeout => { + continue; + } + // Cancel-safe per docs on `changed()`. + result = self.raw_disks_rx.changed() => { + match result { + Ok(()) => (), + Err(_) => { + // The `RawDisk` watch channel should never be + // closed in production, but could be in tests. All + // we can do here is exit; no further updates are + // coming. + error!( + self.log, + "InternalDisksTask exiting unexpectedly: \ + RawDisk watch channel closed by sender" + ); + return; + } + } + } + } + } + } + + async fn reconcile_internal_disks( + &mut self, + internal_raw_disks: IdMap, + disk_adopter: &T, + ) { + info!( + self.log, + "Attempting to ensure adoption of {} internal disks", + internal_raw_disks.len(), + ); + + // We don't want to hold the `disks_tx` lock while we adopt disks, so + // first capture a snapshot of which disks we have. + let current_disks = + self.disks_tx.borrow().keys().cloned().collect::>(); + + // Built the list of disks that are gone. + let mut disks_to_remove = Vec::new(); + for disk_id in ¤t_disks { + if !internal_raw_disks.contains_key(disk_id) { + disks_to_remove.push(disk_id); + } + } + + // Build the list of disks that exist, divided into three categories: + // + // 1. We're already managing the disk (we only need to check whether + // there have been updates to its properties) + // 2. We're not yet managing the disk, and succeeded in adopting it + // 3. We're not yet managing the disk, but failed to adopt it + let mut disks_to_maybe_update = Vec::new(); + let mut disks_to_insert = Vec::new(); + let mut errors = BTreeMap::default(); + + for raw_disk in internal_raw_disks { + // If we already have this disk, we'll just check whether any + // properties (e.g., firmware revisions) have been changed. + if current_disks.contains(raw_disk.identity()) { + disks_to_maybe_update.push(raw_disk); + continue; + } + + // This is a new disk: attempt to adopt it. + let identity = raw_disk.identity().clone(); + let adopt_result = disk_adopter + .adopt_disk(raw_disk.into(), &self.mount_config, &self.log) + .await; + + match adopt_result { + Ok(disk) => { + info!( + self.log, "Adopted new internal disk"; + "identity" => ?identity, + ); + disks_to_insert.push(disk); + } + Err(err) => { + warn!( + self.log, "Internal disk adoption failed"; + "identity" => ?identity, + InlineErrorChain::new(&err), + ); + errors.insert(identity, err); + } + } + } + + // Possibly update the set of disks based on the results of the above. + self.disks_tx.send_if_modified(|disks| { + let mut changed = false; + + // Do any raw removals and additions. + for disk_id in disks_to_remove { + disks.remove(disk_id); + changed = true; + } + for new_disk in disks_to_insert { + disks.insert(new_disk.into()); + changed = true; + } + + // Check for property updates to existing disks. + for raw_disk in disks_to_maybe_update { + // We only push into `disks_to_maybe_update` if this disk + // existed when we looked at `disks_tx` above, and this is the + // only spot where we change it. It must still exist now. + let existing_disk = disks + .get(raw_disk.identity()) + .expect("disk should still be present"); + let existing_raw_disk = + RawDisk::from(existing_disk.disk.clone()); + + if *raw_disk != existing_raw_disk { + // The only property we expect to change is the firmware + // metadata. Update that and check again; if they're still + // not equal, something weird is going on. At least log a + // warning. + let mut updated_disk = existing_disk.clone(); + updated_disk.disk.update_firmware_metadata(&raw_disk); + + if *raw_disk == RawDisk::from(updated_disk.disk.clone()) { + info!( + self.log, "Updated disk firmware metadata"; + "old" => ?updated_disk.firmware(), + "new" => ?raw_disk.firmware(), + "identity" => ?updated_disk.id(), + ); + } else { + warn!( + self.log, + "Updated disk firmware metadata, \ + but other disk properties are different!"; + "old" => ?existing_raw_disk, + "new" => ?*raw_disk, + ); + } + + disks.insert(updated_disk); + changed = true; + } + } + + changed + }); + + // Update our output error watch channel if we have any errors or we're + // going from "some errors" to "no errors". It'd be nice to only send + // out a modification here if the error _content_ changed, but we don't + // derive `PartialEq` on error types. + self.errors_tx.send_if_modified(|errors_tx| { + if !errors.is_empty() || !errors_tx.is_empty() { + *errors_tx = Arc::new(errors); + true + } else { + false + } + }); + } +} + +/// Helper to allow unit tests to run without interacting with the real [`Disk`] +/// implementation. In production, the only implementor of this trait is +/// [`RealDiskAdopter`]. +trait DiskAdopter: Send + Sync + 'static { + fn adopt_disk( + &self, + raw_disk: RawDisk, + mount_config: &MountConfig, + log: &Logger, + ) -> impl Future> + Send; +} + +struct RealDiskAdopter; + +impl DiskAdopter for RealDiskAdopter { + async fn adopt_disk( + &self, + raw_disk: RawDisk, + mount_config: &MountConfig, + log: &Logger, + ) -> Result { + let pool_id = None; // control plane doesn't manage M.2s + let key_requester = None; // M.2s are unencrypted + Disk::new(log, mount_config, raw_disk, pool_id, key_requester).await } } #[cfg(test)] mod tests { use super::*; + use assert_matches::assert_matches; + use omicron_test_utils::dev; + use omicron_test_utils::dev::poll::CondCheckError; + use omicron_test_utils::dev::poll::wait_for_watch_channel_condition; use omicron_uuid_kinds::ZpoolUuid; use proptest::sample::size_range; + use sled_hardware::DiskFirmware; + use sled_hardware::DiskPaths; + use sled_hardware::PooledDisk; + use sled_hardware::UnparsedDisk; + use std::sync::Mutex; use test_strategy::Arbitrary; use test_strategy::proptest; @@ -419,6 +831,19 @@ mod tests { serial: String, } + impl From for InternalDiskDetailsId { + fn from(id: ArbitraryInternalDiskDetailsId) -> Self { + InternalDiskDetailsId { + identity: Arc::new(DiskIdentity { + vendor: id.vendor, + model: id.model, + serial: id.serial, + }), + is_boot_disk: id.is_boot_disk, + } + } + } + #[proptest] fn boot_disks_sort_ahead_of_non_boot_disks_in_id_map( #[any(size_range(2..4).lift())] values: Vec< @@ -427,14 +852,9 @@ mod tests { ) { let disk_map: IdMap<_> = values .into_iter() - .map(|value| InternalDiskDetails { - identity: Arc::new(DiskIdentity { - vendor: value.vendor, - model: value.model, - serial: value.serial, - }), + .map(|id| InternalDiskDetails { + id: id.into(), zpool_name: ZpoolName::new_internal(ZpoolUuid::new_v4()), - is_boot_disk: value.is_boot_disk, slot: None, raw_devfs_path: None, }) @@ -444,11 +864,317 @@ mod tests { // after a non-boot disk. let mut saw_non_boot_disk = false; for disk in disk_map.iter() { - if disk.is_boot_disk { + if disk.is_boot_disk() { assert!(!saw_non_boot_disk); } else { saw_non_boot_disk = true; } } } + + #[derive(Default)] + struct TestDiskAdopter { + inner: Mutex, + } + + #[derive(Default)] + struct TestDiskAdopterInner { + requests: Vec, + should_fail_requests: + BTreeMap DiskError + Send + 'static>>, + } + + impl DiskAdopter for Arc { + async fn adopt_disk( + &self, + raw_disk: RawDisk, + _mount_config: &MountConfig, + _log: &Logger, + ) -> Result { + // InternalDisks should only adopt M2 disks + assert_eq!(raw_disk.variant(), DiskVariant::M2); + + let mut inner = self.inner.lock().unwrap(); + inner.requests.push(raw_disk.clone()); + + if let Some(make_err) = + inner.should_fail_requests.get(raw_disk.identity()) + { + return Err(make_err()); + } + + Ok(Disk::Real(PooledDisk { + paths: DiskPaths { + devfs_path: "/fake-disk".into(), + dev_path: None, + }, + slot: raw_disk.slot(), + variant: raw_disk.variant(), + identity: raw_disk.identity().clone(), + is_boot_disk: raw_disk.is_boot_disk(), + partitions: vec![], + zpool_name: ZpoolName::new_internal(ZpoolUuid::new_v4()), + firmware: raw_disk.firmware().clone(), + })) + } + } + + fn any_mount_config() -> MountConfig { + MountConfig { + root: "/tmp/sled-agent-config-reconciler/internal-disks-tests" + .into(), + synthetic_disk_root: + "/tmp/sled-agent-config-reconciler/internal-disks-tests".into(), + } + } + + fn new_raw_test_disk(variant: DiskVariant, serial: &str) -> RawDisk { + RawDisk::Real(UnparsedDisk::new( + "/test-devfs".into(), + None, + 0, + variant, + DiskIdentity { + vendor: "test".into(), + model: "test".into(), + serial: serial.into(), + }, + false, + DiskFirmware::new(0, None, false, 1, vec![]), + )) + } + + #[tokio::test] + async fn only_m2_disks_are_adopted() { + let logctx = dev::test_setup_log("only_m2_disks_are_adopted"); + + let (raw_disks_tx, raw_disks_rx) = watch::channel(IdMap::new()); + let disk_adopter = Arc::new(TestDiskAdopter::default()); + let mut disks_rx = InternalDisksReceiver::spawn_with_disk_adopter( + Arc::new(any_mount_config()), + raw_disks_rx, + &logctx.log, + Arc::clone(&disk_adopter), + ); + + // There should be no disks to start. + assert!(disks_rx.current_and_update().disks.is_empty()); + + // Add four disks: two M.2 and two U.2. + raw_disks_tx.send_modify(|disks| { + for disk in [ + new_raw_test_disk(DiskVariant::M2, "m2-0"), + new_raw_test_disk(DiskVariant::U2, "u2-0"), + new_raw_test_disk(DiskVariant::M2, "m2-1"), + new_raw_test_disk(DiskVariant::U2, "u2-1"), + ] { + disks.insert(disk.into()); + } + }); + + // Wait for the adopted disks to change; this should happen nearly + // immediately, but we'll put a timeout on to avoid hanging if something + // is broken. + tokio::time::timeout(Duration::from_secs(60), disks_rx.changed()) + .await + .expect("disks changed before timeout") + .expect("changed() succeeded"); + + // We should see the two M.2s only. + let adopted_disks = Arc::clone(&disks_rx.current_and_update().disks); + let serials = adopted_disks + .iter() + .map(|d| d.id.identity.serial.as_str()) + .collect::>(); + let expected_serials = + ["m2-0", "m2-1"].into_iter().collect::>(); + assert_eq!(serials, expected_serials); + + // Our test disk adopter should also have only seen two requests. + let adoption_inner = disk_adopter.inner.lock().unwrap(); + let adopted_serials = adoption_inner + .requests + .iter() + .map(|d| d.identity().serial.as_str()) + .collect::>(); + assert_eq!(adopted_serials, expected_serials); + + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn firmware_updates_are_propagated() { + let logctx = dev::test_setup_log("firmware_updates_are_propagated"); + + // Setup: one disk. + let mut raw_disk = new_raw_test_disk(DiskVariant::M2, "test-m2"); + let (raw_disks_tx, raw_disks_rx) = + watch::channel([raw_disk.clone().into()].into_iter().collect()); + let disk_adopter = Arc::new(TestDiskAdopter::default()); + let mut disks_rx = InternalDisksReceiver::spawn_with_disk_adopter( + Arc::new(any_mount_config()), + raw_disks_rx, + &logctx.log, + Arc::clone(&disk_adopter), + ); + + // Wait for the test disk to be adopted. + tokio::time::timeout(Duration::from_secs(60), disks_rx.changed()) + .await + .expect("disks changed before timeout") + .expect("changed() succeeded"); + assert_eq!(disks_rx.current_and_update().disks.len(), 1); + + // Modify the firmware of the raw disk and publish that change. + let new_firmware = DiskFirmware::new( + raw_disk.firmware().active_slot().wrapping_add(1), + None, + false, + 1, + Vec::new(), + ); + *raw_disk.firmware_mut() = new_firmware; + raw_disks_tx.send_modify(|disks| { + disks.insert(raw_disk.clone().into()); + }); + + // Wait for the change to be noticed. + tokio::time::timeout(Duration::from_secs(60), disks_rx.changed()) + .await + .expect("disks changed before timeout") + .expect("changed() succeeded"); + + // We should still only have one disk, and it should have the new + // firmware. + let current = disks_rx.borrow_and_update_raw_disks(); + assert_eq!(current.len(), 1); + let adopted_disk = current.iter().next().unwrap(); + assert_eq!(adopted_disk.firmware(), raw_disk.firmware()); + + // Our test disk adopter should only have seen a single request: + // changing the firmware doesn't require readoption. + assert_eq!(disk_adopter.inner.lock().unwrap().requests.len(), 1); + + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn removed_disks_are_propagated() { + let logctx = dev::test_setup_log("removed_disks_are_propagated"); + + // Setup: two disks. + let raw_disk1 = new_raw_test_disk(DiskVariant::M2, "m2-1"); + let raw_disk2 = new_raw_test_disk(DiskVariant::M2, "m2-2"); + let (raw_disks_tx, raw_disks_rx) = watch::channel( + [&raw_disk1, &raw_disk2] + .into_iter() + .cloned() + .map(From::from) + .collect(), + ); + let disk_adopter = Arc::new(TestDiskAdopter::default()); + let mut disks_rx = InternalDisksReceiver::spawn_with_disk_adopter( + Arc::new(any_mount_config()), + raw_disks_rx, + &logctx.log, + Arc::clone(&disk_adopter), + ); + + // Wait for the test disks to be adopted. + tokio::time::timeout(Duration::from_secs(60), disks_rx.changed()) + .await + .expect("disks changed before timeout") + .expect("changed() succeeded"); + assert_eq!(disks_rx.current_and_update().disks.len(), 2); + + // Remove test disk 1. + raw_disks_tx.send_modify(|raw_disks| { + raw_disks.remove(raw_disk1.identity()); + }); + + // Wait for the removal to be propagated. + tokio::time::timeout(Duration::from_secs(60), disks_rx.changed()) + .await + .expect("disks changed before timeout") + .expect("changed() succeeded"); + let adopted_disks = Arc::clone(&disks_rx.current_and_update().disks); + let serials = adopted_disks + .iter() + .map(|d| d.id.identity.serial.as_str()) + .collect::>(); + let expected_serials = ["m2-2"].into_iter().collect::>(); + assert_eq!(serials, expected_serials); + + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn failures_are_retried() { + let logctx = dev::test_setup_log("failures_are_retried"); + + // Setup: one disk, and configure the disk adopter to fail. + let raw_disk = new_raw_test_disk(DiskVariant::M2, "test-m2"); + let (_raw_disks_tx, raw_disks_rx) = watch::channel( + [&raw_disk].into_iter().cloned().map(From::from).collect(), + ); + let disk_adopter = Arc::new(TestDiskAdopter::default()); + disk_adopter.inner.lock().unwrap().should_fail_requests.insert( + raw_disk.identity().clone(), + Box::new(|| { + DiskError::PooledDisk(PooledDiskError::UnexpectedVariant) + }), + ); + + let mut disks_rx = InternalDisksReceiver::spawn_with_disk_adopter( + Arc::new(any_mount_config()), + raw_disks_rx, + &logctx.log, + Arc::clone(&disk_adopter), + ); + + // Wait for the error to be reported. + tokio::time::timeout( + Duration::from_secs(60), + disks_rx.errors_rx.changed(), + ) + .await + .expect("errors changed before timeout") + .expect("changed() succeeded"); + + let errors = Arc::clone(&*disks_rx.errors_rx.borrow_and_update()); + assert_eq!(errors.len(), 1); + assert_matches!( + errors.get(raw_disk.identity()), + Some(DiskError::PooledDisk(PooledDiskError::UnexpectedVariant)) + ); + + // Change our disk adopter to allow these requests to succeed. + disk_adopter.inner.lock().unwrap().should_fail_requests.clear(); + + // Wait for the disk to be adopted. + tokio::time::timeout(Duration::from_secs(60), disks_rx.changed()) + .await + .expect("disks changed before timeout") + .expect("changed() succeeded"); + assert_eq!(disks_rx.current_and_update().disks.len(), 1); + + // Wait for the errors to be cleared. This is a separate channel, so + // it's possible we're racing, but this should happen quickly after the + // disk is adopted. + wait_for_watch_channel_condition( + &mut disks_rx.errors_rx, + async |errors| { + if errors.is_empty() { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + Duration::from_secs(30), + ) + .await + .expect("error should be gone"); + + logctx.cleanup_successful(); + } } diff --git a/sled-agent/config-reconciler/src/raw_disks.rs b/sled-agent/config-reconciler/src/raw_disks.rs index 7e2956162f..6c984e0832 100644 --- a/sled-agent/config-reconciler/src/raw_disks.rs +++ b/sled-agent/config-reconciler/src/raw_disks.rs @@ -145,6 +145,12 @@ impl From for RawDiskWithId { } } +impl From for RawDisk { + fn from(disk: RawDiskWithId) -> Self { + disk.disk + } +} + impl Deref for RawDiskWithId { type Target = RawDisk; diff --git a/sled-agent/config-reconciler/src/reconciler_task.rs b/sled-agent/config-reconciler/src/reconciler_task.rs index e6e1bf7a21..f065f1595b 100644 --- a/sled-agent/config-reconciler/src/reconciler_task.rs +++ b/sled-agent/config-reconciler/src/reconciler_task.rs @@ -256,6 +256,10 @@ struct ReconcilerTask { currently_managed_zpools_tx: watch::Sender>, sled_agent_facilities: T, log: Logger, + // TODO where do we want to do dump setup? Needs both internal and external + // disks. Maybe this task, or maybe a task just for dump setup? + // Invokes dumpadm(8) and savecore(8) when new disks are encountered + // dump_setup: DumpSetup, } impl ReconcilerTask { diff --git a/sled-storage/src/disk.rs b/sled-storage/src/disk.rs index c5858eca68..6391152c19 100644 --- a/sled-storage/src/disk.rs +++ b/sled-storage/src/disk.rs @@ -383,7 +383,7 @@ impl Disk { } } - pub(crate) fn update_firmware_metadata(&mut self, raw_disk: &RawDisk) { + pub fn update_firmware_metadata(&mut self, raw_disk: &RawDisk) { match self { Disk::Real(pooled_disk) => { pooled_disk.firmware = raw_disk.firmware().clone();