diff --git a/Cargo.toml b/Cargo.toml index 4d7ed33e..db915b20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,11 +91,7 @@ splits-io-api = { version = "0.4.0", optional = true } # Auto Splitting livesplit-auto-splitting = { path = "crates/livesplit-auto-splitting", version = "0.1.0", optional = true } -tokio = { version = "1.24.2", default-features = false, features = [ - "rt", - "sync", - "time", -], optional = true } +arc-swap = { version = "1.7.1", optional = true } log = { version = "0.4.14", default-features = false, optional = true } [target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies] @@ -191,7 +187,7 @@ wasm-web = [ "web-sys", ] networking = ["std", "splits-io-api"] -auto-splitting = ["std", "livesplit-auto-splitting", "tokio", "log"] +auto-splitting = ["std", "livesplit-auto-splitting", "arc-swap", "log"] [lib] bench = false diff --git a/crates/livesplit-auto-splitting/src/runtime/mod.rs b/crates/livesplit-auto-splitting/src/runtime/mod.rs index f9fe891b..d47b77d6 100644 --- a/crates/livesplit-auto-splitting/src/runtime/mod.rs +++ b/crates/livesplit-auto-splitting/src/runtime/mod.rs @@ -190,7 +190,7 @@ fn single_process() -> ProcessRefreshKind { #[non_exhaustive] pub struct Config { /// This enables debug information for the WebAssembly module. This is - /// useful for debugging purposes. By default this `true` if the feature + /// useful for debugging purposes. By default this is `true` if the feature /// `debugger-support` is enabled. pub debug_info: bool, /// This enables optimizations for the WebAssembly module. This is enabled @@ -198,8 +198,8 @@ pub struct Config { /// splitter. pub optimize: bool, /// This enables backtrace details for the WebAssembly module. If a trap - /// occurs more details are printed in the backtrace. By default this `true` - /// if the feature `enhanced-backtrace` is enabled. + /// occurs more details are printed in the backtrace. By default this is + /// `true` if the feature `enhanced-backtrace` is enabled. pub backtrace_details: bool, } diff --git a/src/auto_splitting/mod.rs b/src/auto_splitting/mod.rs index 49f23440..a14bdb62 100644 --- a/src/auto_splitting/mod.rs +++ b/src/auto_splitting/mod.rs @@ -556,17 +556,21 @@ use crate::{ platform::Arc, timing::TimerPhase, }; +use arc_swap::ArcSwapOption; pub use livesplit_auto_splitting::{settings, wasi_path}; use livesplit_auto_splitting::{ - AutoSplitter, Config, CreationError, InterruptHandle, LogLevel, Timer as AutoSplitTimer, - TimerState, + AutoSplitter, Config, CreationError, LogLevel, Timer as AutoSplitTimer, TimerState, }; use snafu::Snafu; -use std::{fmt, fs, io, path::PathBuf, thread, time::Duration}; -use tokio::{ - runtime, - sync::watch, - time::{timeout_at, Instant}, +use std::{ + fmt, fs, io, + path::PathBuf, + sync::{ + mpsc::{self, Receiver, RecvTimeoutError, Sender}, + Condvar, Mutex, + }, + thread, + time::{Duration, Instant}, }; /// An error that the [`Runtime`] can return. @@ -594,16 +598,38 @@ pub enum Error { /// An auto splitter runtime that allows using an auto splitter provided as a /// WebAssembly module to control a timer. -pub struct Runtime { - interrupt_receiver: watch::Receiver>, - auto_splitter: watch::Sender>>>, +pub struct Runtime { + shared_state: Arc>, + changed_sender: Sender<()>, runtime: livesplit_auto_splitting::Runtime, } -impl Drop for Runtime { +struct SharedState { + auto_splitter: ArcSwapOption>>, + watchdog_state: Mutex, + watchdog_state_update: Condvar, +} + +enum WatchdogState { + Unloaded, + Shutdown, + Tick(Instant), +} + +impl SharedState { + fn update_watchdog(&self, watchdog_state: WatchdogState) -> Result<(), ()> { + *self.watchdog_state.lock().map_err(drop)? = watchdog_state; + self.watchdog_state_update.notify_one(); + Ok(()) + } +} + +impl Drop for Runtime { fn drop(&mut self) { - if let Some(handle) = &*self.interrupt_receiver.borrow() { - handle.interrupt(); + let _ = self.shared_state.update_watchdog(WatchdogState::Shutdown); + if let Some(auto_splitter) = &*self.shared_state.auto_splitter.load() { + let _ = self.unload(); + auto_splitter.interrupt_handle().interrupt(); } } } @@ -618,38 +644,36 @@ impl Runtime { /// Starts the runtime. Doesn't actually load an auto splitter until /// [`load`][Runtime::load] is called. pub fn new() -> Self { - let (sender, receiver) = watch::channel(None); - let (interrupt_sender, interrupt_receiver) = watch::channel(None); - let (timeout_sender, timeout_receiver) = watch::channel(None); + let (changed_sender, changed_receiver) = mpsc::channel(); + let shared_state = Arc::new(SharedState { + auto_splitter: ArcSwapOption::from(None), + watchdog_state: Mutex::new(WatchdogState::Unloaded), + watchdog_state_update: Condvar::new(), + }); thread::Builder::new() .name("Auto Splitting Runtime".into()) - .spawn(move || { - runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap() - .block_on(run(receiver, timeout_sender, interrupt_sender)) + .spawn({ + let shared_state = shared_state.clone(); + move || { + run(shared_state, changed_receiver); + } }) .unwrap(); thread::Builder::new() .name("Auto Splitting Watchdog".into()) .spawn({ - let interrupt_receiver = interrupt_receiver.clone(); + let shared_state = shared_state.clone(); move || { - runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap() - .block_on(watchdog(timeout_receiver, interrupt_receiver)) + watchdog(shared_state); } }) .unwrap(); Self { - interrupt_receiver, - auto_splitter: sender, + shared_state, + changed_sender, // TODO: unwrap? runtime: livesplit_auto_splitting::Runtime::new(Config::default()).unwrap(), } @@ -666,8 +690,12 @@ impl Runtime { .instantiate(Timer(timer), None, None) .map_err(|e| Error::LoadFailed { source: e })?; - self.auto_splitter - .send(Some(auto_splitter)) + self.shared_state + .auto_splitter + .store(Some(Arc::new(auto_splitter))); + + self.changed_sender + .send(()) .map_err(|_| Error::ThreadStopped) } @@ -675,8 +703,10 @@ impl Runtime { /// there isn't currently an auto splitter loaded, only if the runtime /// thread stops unexpectedly. pub fn unload(&self) -> Result<(), Error> { - self.auto_splitter - .send(None) + self.shared_state.auto_splitter.store(None); + + self.changed_sender + .send(()) .map_err(|_| Error::ThreadStopped) } @@ -686,12 +716,22 @@ impl Runtime { /// [`set_settings_map`](Self::set_settings_map) or /// [`set_settings_map_if_unchanged`](Self::set_settings_map_if_unchanged). pub fn settings_map(&self) -> Option { - Some(self.auto_splitter.borrow().as_ref()?.settings_map()) + Some( + self.shared_state + .auto_splitter + .load() + .as_ref()? + .settings_map(), + ) } /// Unconditionally sets the settings map. pub fn set_settings_map(&self, map: settings::Map) -> Option<()> { - self.auto_splitter.borrow().as_ref()?.set_settings_map(map); + self.shared_state + .auto_splitter + .load() + .as_ref()? + .set_settings_map(map); Some(()) } @@ -707,8 +747,9 @@ impl Runtime { new: settings::Map, ) -> Option { Some( - self.auto_splitter - .borrow() + self.shared_state + .auto_splitter + .load() .as_ref()? .set_settings_map_if_unchanged(old, new), ) @@ -722,7 +763,13 @@ impl Runtime { /// tick is complete. Any changes the user does to these widgets should be /// applied to the settings map and stored back. pub fn settings_widgets(&self) -> Option>> { - Some(self.auto_splitter.borrow().as_ref()?.settings_widgets()) + Some( + self.shared_state + .auto_splitter + .load() + .as_ref()? + .settings_widgets(), + ) } } @@ -792,99 +839,118 @@ impl AutoSplitTimer for Timer { } } -async fn run( - mut auto_splitter: watch::Receiver>>>, - timeout_sender: watch::Sender>, - interrupt_sender: watch::Sender>, +fn run( + shared_state: Arc>, + changed_receiver: Receiver<()>, ) { 'back_to_not_having_an_auto_splitter: loop { - interrupt_sender.send(None).ok(); - timeout_sender.send(None).ok(); + if shared_state + .update_watchdog(WatchdogState::Unloaded) + .is_err() + { + return; + } - let mut next_step = loop { - match auto_splitter.changed().await { - Ok(()) => { - if let Some(auto_splitter) = &*auto_splitter.borrow() { - log::info!(target: "Auto Splitter", "Loaded auto splitter"); - let next_step = Instant::now(); - interrupt_sender - .send(Some(auto_splitter.interrupt_handle())) - .ok(); - timeout_sender.send(Some(next_step)).ok(); - break next_step; - } - } - Err(_) => return, - }; - }; + while shared_state.auto_splitter.load().is_none() { + if changed_receiver.recv().is_err() { + return; + } + } + + log::info!(target: "Auto Splitter", "Loaded auto splitter"); + let mut next_tick = Instant::now(); + + if shared_state + .update_watchdog(WatchdogState::Tick(next_tick)) + .is_err() + { + return; + } loop { - let result = timeout_at(next_step, auto_splitter.changed()).await; - let Some(auto_splitter) = &*auto_splitter.borrow() else { - log::info!(target: "Auto Splitter", "Unloaded auto splitter"); + let result = + changed_receiver.recv_timeout(next_tick.saturating_duration_since(Instant::now())); + + let Some(auto_splitter) = &*shared_state.auto_splitter.load() else { + log::info!(target: "Auto Splitter", "Unloaded"); continue 'back_to_not_having_an_auto_splitter; }; + let auto_splitter = &**auto_splitter; match result { - Ok(Ok(())) => { + Ok(()) => { log::info!(target: "Auto Splitter", "Replaced auto splitter"); - next_step = Instant::now(); - interrupt_sender - .send(Some(auto_splitter.interrupt_handle())) - .ok(); - timeout_sender.send(Some(next_step)).ok(); - } - Ok(Err(_)) => return, - Err(_) => { - let result = auto_splitter.lock().update(); - match result { - Ok(()) => { - next_step = next_step - .into_std() - .checked_add(auto_splitter.tick_rate()) - .map_or(next_step, |t| t.into()); - - timeout_sender.send(Some(next_step)).ok(); - } - Err(e) => { - log::error!(target: "Auto Splitter", "Unloaded, because the script trapped: {:?}", e); - continue 'back_to_not_having_an_auto_splitter; - } + next_tick = Instant::now(); + if shared_state + .update_watchdog(WatchdogState::Tick(next_tick)) + .is_err() + { + return; } } + Err(RecvTimeoutError::Disconnected) => return, + Err(RecvTimeoutError::Timeout) => { + // Actually the default happy path. + } + } + + // Intentionally not part of the if let to ensure the lock is + // released early. + let result = auto_splitter.lock().update(); + + if let Err(e) = result { + shared_state.auto_splitter.store(None); + log::error!(target: "Auto Splitter", "Unloaded, because the script trapped: {:?}", e); + continue 'back_to_not_having_an_auto_splitter; + } + + next_tick = next_tick + .checked_add(auto_splitter.tick_rate()) + .unwrap_or(next_tick); + + if shared_state + .update_watchdog(WatchdogState::Tick(next_tick)) + .is_err() + { + return; } } } } -async fn watchdog( - mut timeout_receiver: watch::Receiver>, - interrupt_receiver: watch::Receiver>, -) { +fn watchdog(shared_state: Arc>) { const TIMEOUT: Duration = Duration::from_secs(5); + let Ok(mut state) = shared_state.watchdog_state.lock() else { + return; + }; + loop { - let instant = *timeout_receiver.borrow(); - match instant { - Some(time) => match timeout_at( - time.checked_add(TIMEOUT).unwrap_or(time), - timeout_receiver.changed(), - ) - .await - { - Ok(Ok(_)) => {} - Ok(Err(_)) => return, - Err(_) => { - if let Some(handle) = &*interrupt_receiver.borrow() { - handle.interrupt(); - } - } + state = match *state { + WatchdogState::Unloaded => match shared_state.watchdog_state_update.wait(state) { + Ok(new_state) => new_state, + _ => return, }, - None => { - if timeout_receiver.changed().await.is_err() { + WatchdogState::Shutdown => return, + WatchdogState::Tick(next_tick) => { + let timeout_instant = next_tick.checked_add(TIMEOUT).unwrap_or(next_tick); + let timeout_duration = timeout_instant.saturating_duration_since(Instant::now()); + + let Ok((new_state, result)) = shared_state + .watchdog_state_update + .wait_timeout(state, timeout_duration) + else { return; + }; + + if result.timed_out() { + if let Some(auto_splitter) = &*shared_state.auto_splitter.load() { + auto_splitter.interrupt_handle().interrupt(); + } } + + new_state } - } + }; } }