From 6dcf637933e2cb2f4972c0e53b0411c9fc813606 Mon Sep 17 00:00:00 2001 From: Igor Gutorov Date: Mon, 30 Oct 2023 01:47:18 +0300 Subject: [PATCH] Implement `Upkeep::start_pinned` API --- Cargo.toml | 1 + src/upkeep.rs | 79 ++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 67 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 61477b2..43f29f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ prost = ["prost-types"] once_cell = "1.4" prost-types = { version = "0.11", default-features = false, optional = true } crossbeam-utils = "0.8.5" +core_affinity = "0.8" [target.'cfg(target_arch = "x86")'.dependencies] raw-cpuid = "11.0" diff --git a/src/upkeep.rs b/src/upkeep.rs index 329417d..2d45d3c 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -3,7 +3,7 @@ use std::{ fmt, io, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + mpsc, Arc, }, thread::{self, JoinHandle}, time::Duration, @@ -22,9 +22,8 @@ static GLOBAL_UPKEEP_RUNNING: AtomicBool = AtomicBool::new(false); /// [`Clock::recent`], which is updated by a background upkeep thread. That thread is configured /// and spanwed via [`Upkeep`]. /// -/// [`Upkeep`] can construct a new clock (or be passed an existing clock to use), and given an -/// update interval, and it will faithfully attempt to update the global recent time on the -/// specified interval. There is a trade-off to be struck in terms of how often the time is +/// Given an update interval, [`Upkeep`] will faithfully attempt to update the global recent time +/// on the specified interval. There is a trade-off to be struck in terms of how often the time is /// updated versus the required accuracy. Checking the time and updating the global reference is /// itself not zero-cost, and so care must be taken to analyze the number of readers in order to /// ensure that, given a particular update interval, the upkeep thread is saving more CPU time than @@ -44,7 +43,6 @@ static GLOBAL_UPKEEP_RUNNING: AtomicBool = AtomicBool::new(false); #[derive(Debug)] pub struct Upkeep { interval: Duration, - clock: Clock, } /// Handle to a running upkeep thread. @@ -58,12 +56,15 @@ pub struct Handle { } /// Errors thrown during the creation/spawning of the upkeep thread. +#[non_exhaustive] #[derive(Debug)] pub enum Error { /// An upkeep thread is already running in this process. UpkeepRunning, /// An error occurred when trying to spawn the upkeep thread. FailedToSpawnUpkeepThread(io::Error), + /// The upkeep thread could not be successfully pinned. + FailedToPinUpkeepThread, } impl fmt::Display for Error { @@ -73,6 +74,7 @@ impl fmt::Display for Error { Error::FailedToSpawnUpkeepThread(e) => { write!(f, "failed to spawn upkeep thread: {}", e) } + Error::FailedToPinUpkeepThread => write!(f, "failed to pin upkeep thread"), } } } @@ -82,23 +84,22 @@ impl std::error::Error for Error { match self { Self::UpkeepRunning => None, Self::FailedToSpawnUpkeepThread(e) => Some(e), + Self::FailedToPinUpkeepThread => None, } } } impl Upkeep { /// Creates a new [`Upkeep`]. - /// - /// This creates a new internal clock for acquiring the current time. If you have an existing - /// [`Clock`] that is already calibrated, it is slightly faster to clone it and construct the - /// builder with [`new_with_clock`](Upkeep::new_with_clock) to avoid recalibrating. pub fn new(interval: Duration) -> Upkeep { - Self::new_with_clock(interval, Clock::new()) + Upkeep { interval } } /// Creates a new [`Upkeep`] with the specified [`Clock`] instance. - pub fn new_with_clock(interval: Duration, clock: Clock) -> Upkeep { - Upkeep { interval, clock } + #[doc(hidden)] + #[deprecated = "`Upkeep::new_with_clock` is not faster than `Upkeep::new`. Use `Upkeep::new` instead."] + pub fn new_with_clock(interval: Duration, _: Clock) -> Upkeep { + Upkeep { interval } } /// Start the upkeep thread, periodically updating the global coarse time. @@ -112,20 +113,60 @@ impl Upkeep { /// If either an existing upkeep thread is running, or there was an issue when attempting to /// spawn the upkeep thread, an error variant will be returned describing the error. pub fn start(self) -> Result { + self.inner_start(None) + } + + /// Start the upkeep thread pinned to `core_id`, periodically updating the global coarse time. + /// [`Upkeep`] will construct a [`Clock`] and run calibration against the core it is + /// pinned to. Since all [`Clock`] instances share a global lazily initialized calibration, + /// users intending to use this API should avoid calling [`Clock::new`] before starting a + /// pinned [`Upkeep`] thread. + /// + /// [`Handle`] represents a drop guard for the upkeep thread if it is successfully spawned. + /// Dropping the handle will also instruct the upkeep thread to stop and exist, so the handle + /// must be held while the upkeep thread should continue to run. + /// + /// # Errors + /// + /// If either an existing upkeep thread is running, or there was an issue when attempting to + /// spawn the upkeep thread, or the upkeep thread was not successfully pinned to `core_id`, + /// an error variant will be returned describing the error. + pub fn start_pinned(self, core_id: core_affinity::CoreId) -> Result { + self.inner_start(Some(core_id)) + } + + fn inner_start(self, core_id: Option) -> Result { // If another upkeep thread is running, inform the caller. let _ = GLOBAL_UPKEEP_RUNNING .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .map_err(|_| Error::UpkeepRunning)?; let interval = self.interval; - let clock = self.clock; let done = Arc::new(AtomicBool::new(false)); let their_done = done.clone(); + let (pin_success_sender, pin_success_receiver) = mpsc::sync_channel(1); let result = thread::Builder::new() .name("quanta-upkeep".to_string()) .spawn(move || { + if let Some(core_id) = core_id { + let success = core_affinity::set_for_current(core_id); + + // Panic safety: `send` may panic if the receiver side has been dropped. + // That can happen only if the parent thread paniced before we reached this + // point. So, this (practically) never panics. + pin_success_sender.send(success).unwrap(); + + // Do not keep this thread running if pinning was requested, but we failed to + // pin. + if !success { + GLOBAL_UPKEEP_RUNNING.store(false, Ordering::SeqCst); + return; + } + } + + let clock = Clock::new(); while !their_done.load(Ordering::Acquire) { set_recent(clock.now()); @@ -141,6 +182,18 @@ impl Upkeep { GLOBAL_UPKEEP_RUNNING.store(false, Ordering::SeqCst); } + // When thread pinning is requested, verify `quanta-upkeep` has been successfully pinned. + if core_id.is_some() { + // Panic safety: `recv` may panic if the sender has disconnected, or is disconnecting + // while this is blocking. + // However, since we always send a message before the sender can be dropped, this call + // never panics. + let success = pin_success_receiver.recv().unwrap(); + if !success { + return Err(Error::FailedToPinUpkeepThread); + } + } + let handle = result?; Ok(Handle {