From 6793601b89be71fb63b7317de98d8238444ec258 Mon Sep 17 00:00:00 2001 From: Christopher Serr Date: Sat, 16 Nov 2024 12:46:11 +0100 Subject: [PATCH] Remove 2nd Tokio Runtime in Auto Splitting Runtime Wasmtime's WASI implementation nowadays internally has its own Tokio runtime. You can only ever run a single Tokio runtime on a thread at the same time. Wasmtime provides async support, so this likely would have also solved the issue. However, this probably would've made it harder to use the runtime in other situations. So instead we just remove our own runtime, which was not really necessary anyway. --- Cargo.toml | 8 +- .../src/runtime/mod.rs | 6 +- src/auto_splitting/mod.rs | 284 +++++++++++------- 3 files changed, 180 insertions(+), 118 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4d7ed33e..db915b20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,11 +91,7 @@ splits-io-api = { version = "0.4.0", optional = true } # Auto Splitting livesplit-auto-splitting = { path = "crates/livesplit-auto-splitting", version = "0.1.0", optional = true } -tokio = { version = "1.24.2", default-features = false, features = [ - "rt", - "sync", - "time", -], optional = true } +arc-swap = { version = "1.7.1", optional = true } log = { version = "0.4.14", default-features = false, optional = true } [target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies] @@ -191,7 +187,7 @@ wasm-web = [ "web-sys", ] networking = ["std", "splits-io-api"] -auto-splitting = ["std", "livesplit-auto-splitting", "tokio", "log"] +auto-splitting = ["std", "livesplit-auto-splitting", "arc-swap", "log"] [lib] bench = false diff --git a/crates/livesplit-auto-splitting/src/runtime/mod.rs b/crates/livesplit-auto-splitting/src/runtime/mod.rs index f9fe891b..d47b77d6 100644 --- a/crates/livesplit-auto-splitting/src/runtime/mod.rs +++ b/crates/livesplit-auto-splitting/src/runtime/mod.rs @@ -190,7 +190,7 @@ fn single_process() -> ProcessRefreshKind { #[non_exhaustive] pub struct Config { /// This enables debug information for the WebAssembly module. This is - /// useful for debugging purposes. By default this `true` if the feature + /// useful for debugging purposes. By default this is `true` if the feature /// `debugger-support` is enabled. pub debug_info: bool, /// This enables optimizations for the WebAssembly module. This is enabled @@ -198,8 +198,8 @@ pub struct Config { /// splitter. pub optimize: bool, /// This enables backtrace details for the WebAssembly module. If a trap - /// occurs more details are printed in the backtrace. By default this `true` - /// if the feature `enhanced-backtrace` is enabled. + /// occurs more details are printed in the backtrace. By default this is + /// `true` if the feature `enhanced-backtrace` is enabled. pub backtrace_details: bool, } diff --git a/src/auto_splitting/mod.rs b/src/auto_splitting/mod.rs index 49f23440..a14bdb62 100644 --- a/src/auto_splitting/mod.rs +++ b/src/auto_splitting/mod.rs @@ -556,17 +556,21 @@ use crate::{ platform::Arc, timing::TimerPhase, }; +use arc_swap::ArcSwapOption; pub use livesplit_auto_splitting::{settings, wasi_path}; use livesplit_auto_splitting::{ - AutoSplitter, Config, CreationError, InterruptHandle, LogLevel, Timer as AutoSplitTimer, - TimerState, + AutoSplitter, Config, CreationError, LogLevel, Timer as AutoSplitTimer, TimerState, }; use snafu::Snafu; -use std::{fmt, fs, io, path::PathBuf, thread, time::Duration}; -use tokio::{ - runtime, - sync::watch, - time::{timeout_at, Instant}, +use std::{ + fmt, fs, io, + path::PathBuf, + sync::{ + mpsc::{self, Receiver, RecvTimeoutError, Sender}, + Condvar, Mutex, + }, + thread, + time::{Duration, Instant}, }; /// An error that the [`Runtime`] can return. @@ -594,16 +598,38 @@ pub enum Error { /// An auto splitter runtime that allows using an auto splitter provided as a /// WebAssembly module to control a timer. -pub struct Runtime { - interrupt_receiver: watch::Receiver>, - auto_splitter: watch::Sender>>>, +pub struct Runtime { + shared_state: Arc>, + changed_sender: Sender<()>, runtime: livesplit_auto_splitting::Runtime, } -impl Drop for Runtime { +struct SharedState { + auto_splitter: ArcSwapOption>>, + watchdog_state: Mutex, + watchdog_state_update: Condvar, +} + +enum WatchdogState { + Unloaded, + Shutdown, + Tick(Instant), +} + +impl SharedState { + fn update_watchdog(&self, watchdog_state: WatchdogState) -> Result<(), ()> { + *self.watchdog_state.lock().map_err(drop)? = watchdog_state; + self.watchdog_state_update.notify_one(); + Ok(()) + } +} + +impl Drop for Runtime { fn drop(&mut self) { - if let Some(handle) = &*self.interrupt_receiver.borrow() { - handle.interrupt(); + let _ = self.shared_state.update_watchdog(WatchdogState::Shutdown); + if let Some(auto_splitter) = &*self.shared_state.auto_splitter.load() { + let _ = self.unload(); + auto_splitter.interrupt_handle().interrupt(); } } } @@ -618,38 +644,36 @@ impl Runtime { /// Starts the runtime. Doesn't actually load an auto splitter until /// [`load`][Runtime::load] is called. pub fn new() -> Self { - let (sender, receiver) = watch::channel(None); - let (interrupt_sender, interrupt_receiver) = watch::channel(None); - let (timeout_sender, timeout_receiver) = watch::channel(None); + let (changed_sender, changed_receiver) = mpsc::channel(); + let shared_state = Arc::new(SharedState { + auto_splitter: ArcSwapOption::from(None), + watchdog_state: Mutex::new(WatchdogState::Unloaded), + watchdog_state_update: Condvar::new(), + }); thread::Builder::new() .name("Auto Splitting Runtime".into()) - .spawn(move || { - runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap() - .block_on(run(receiver, timeout_sender, interrupt_sender)) + .spawn({ + let shared_state = shared_state.clone(); + move || { + run(shared_state, changed_receiver); + } }) .unwrap(); thread::Builder::new() .name("Auto Splitting Watchdog".into()) .spawn({ - let interrupt_receiver = interrupt_receiver.clone(); + let shared_state = shared_state.clone(); move || { - runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap() - .block_on(watchdog(timeout_receiver, interrupt_receiver)) + watchdog(shared_state); } }) .unwrap(); Self { - interrupt_receiver, - auto_splitter: sender, + shared_state, + changed_sender, // TODO: unwrap? runtime: livesplit_auto_splitting::Runtime::new(Config::default()).unwrap(), } @@ -666,8 +690,12 @@ impl Runtime { .instantiate(Timer(timer), None, None) .map_err(|e| Error::LoadFailed { source: e })?; - self.auto_splitter - .send(Some(auto_splitter)) + self.shared_state + .auto_splitter + .store(Some(Arc::new(auto_splitter))); + + self.changed_sender + .send(()) .map_err(|_| Error::ThreadStopped) } @@ -675,8 +703,10 @@ impl Runtime { /// there isn't currently an auto splitter loaded, only if the runtime /// thread stops unexpectedly. pub fn unload(&self) -> Result<(), Error> { - self.auto_splitter - .send(None) + self.shared_state.auto_splitter.store(None); + + self.changed_sender + .send(()) .map_err(|_| Error::ThreadStopped) } @@ -686,12 +716,22 @@ impl Runtime { /// [`set_settings_map`](Self::set_settings_map) or /// [`set_settings_map_if_unchanged`](Self::set_settings_map_if_unchanged). pub fn settings_map(&self) -> Option { - Some(self.auto_splitter.borrow().as_ref()?.settings_map()) + Some( + self.shared_state + .auto_splitter + .load() + .as_ref()? + .settings_map(), + ) } /// Unconditionally sets the settings map. pub fn set_settings_map(&self, map: settings::Map) -> Option<()> { - self.auto_splitter.borrow().as_ref()?.set_settings_map(map); + self.shared_state + .auto_splitter + .load() + .as_ref()? + .set_settings_map(map); Some(()) } @@ -707,8 +747,9 @@ impl Runtime { new: settings::Map, ) -> Option { Some( - self.auto_splitter - .borrow() + self.shared_state + .auto_splitter + .load() .as_ref()? .set_settings_map_if_unchanged(old, new), ) @@ -722,7 +763,13 @@ impl Runtime { /// tick is complete. Any changes the user does to these widgets should be /// applied to the settings map and stored back. pub fn settings_widgets(&self) -> Option>> { - Some(self.auto_splitter.borrow().as_ref()?.settings_widgets()) + Some( + self.shared_state + .auto_splitter + .load() + .as_ref()? + .settings_widgets(), + ) } } @@ -792,99 +839,118 @@ impl AutoSplitTimer for Timer { } } -async fn run( - mut auto_splitter: watch::Receiver>>>, - timeout_sender: watch::Sender>, - interrupt_sender: watch::Sender>, +fn run( + shared_state: Arc>, + changed_receiver: Receiver<()>, ) { 'back_to_not_having_an_auto_splitter: loop { - interrupt_sender.send(None).ok(); - timeout_sender.send(None).ok(); + if shared_state + .update_watchdog(WatchdogState::Unloaded) + .is_err() + { + return; + } - let mut next_step = loop { - match auto_splitter.changed().await { - Ok(()) => { - if let Some(auto_splitter) = &*auto_splitter.borrow() { - log::info!(target: "Auto Splitter", "Loaded auto splitter"); - let next_step = Instant::now(); - interrupt_sender - .send(Some(auto_splitter.interrupt_handle())) - .ok(); - timeout_sender.send(Some(next_step)).ok(); - break next_step; - } - } - Err(_) => return, - }; - }; + while shared_state.auto_splitter.load().is_none() { + if changed_receiver.recv().is_err() { + return; + } + } + + log::info!(target: "Auto Splitter", "Loaded auto splitter"); + let mut next_tick = Instant::now(); + + if shared_state + .update_watchdog(WatchdogState::Tick(next_tick)) + .is_err() + { + return; + } loop { - let result = timeout_at(next_step, auto_splitter.changed()).await; - let Some(auto_splitter) = &*auto_splitter.borrow() else { - log::info!(target: "Auto Splitter", "Unloaded auto splitter"); + let result = + changed_receiver.recv_timeout(next_tick.saturating_duration_since(Instant::now())); + + let Some(auto_splitter) = &*shared_state.auto_splitter.load() else { + log::info!(target: "Auto Splitter", "Unloaded"); continue 'back_to_not_having_an_auto_splitter; }; + let auto_splitter = &**auto_splitter; match result { - Ok(Ok(())) => { + Ok(()) => { log::info!(target: "Auto Splitter", "Replaced auto splitter"); - next_step = Instant::now(); - interrupt_sender - .send(Some(auto_splitter.interrupt_handle())) - .ok(); - timeout_sender.send(Some(next_step)).ok(); - } - Ok(Err(_)) => return, - Err(_) => { - let result = auto_splitter.lock().update(); - match result { - Ok(()) => { - next_step = next_step - .into_std() - .checked_add(auto_splitter.tick_rate()) - .map_or(next_step, |t| t.into()); - - timeout_sender.send(Some(next_step)).ok(); - } - Err(e) => { - log::error!(target: "Auto Splitter", "Unloaded, because the script trapped: {:?}", e); - continue 'back_to_not_having_an_auto_splitter; - } + next_tick = Instant::now(); + if shared_state + .update_watchdog(WatchdogState::Tick(next_tick)) + .is_err() + { + return; } } + Err(RecvTimeoutError::Disconnected) => return, + Err(RecvTimeoutError::Timeout) => { + // Actually the default happy path. + } + } + + // Intentionally not part of the if let to ensure the lock is + // released early. + let result = auto_splitter.lock().update(); + + if let Err(e) = result { + shared_state.auto_splitter.store(None); + log::error!(target: "Auto Splitter", "Unloaded, because the script trapped: {:?}", e); + continue 'back_to_not_having_an_auto_splitter; + } + + next_tick = next_tick + .checked_add(auto_splitter.tick_rate()) + .unwrap_or(next_tick); + + if shared_state + .update_watchdog(WatchdogState::Tick(next_tick)) + .is_err() + { + return; } } } } -async fn watchdog( - mut timeout_receiver: watch::Receiver>, - interrupt_receiver: watch::Receiver>, -) { +fn watchdog(shared_state: Arc>) { const TIMEOUT: Duration = Duration::from_secs(5); + let Ok(mut state) = shared_state.watchdog_state.lock() else { + return; + }; + loop { - let instant = *timeout_receiver.borrow(); - match instant { - Some(time) => match timeout_at( - time.checked_add(TIMEOUT).unwrap_or(time), - timeout_receiver.changed(), - ) - .await - { - Ok(Ok(_)) => {} - Ok(Err(_)) => return, - Err(_) => { - if let Some(handle) = &*interrupt_receiver.borrow() { - handle.interrupt(); - } - } + state = match *state { + WatchdogState::Unloaded => match shared_state.watchdog_state_update.wait(state) { + Ok(new_state) => new_state, + _ => return, }, - None => { - if timeout_receiver.changed().await.is_err() { + WatchdogState::Shutdown => return, + WatchdogState::Tick(next_tick) => { + let timeout_instant = next_tick.checked_add(TIMEOUT).unwrap_or(next_tick); + let timeout_duration = timeout_instant.saturating_duration_since(Instant::now()); + + let Ok((new_state, result)) = shared_state + .watchdog_state_update + .wait_timeout(state, timeout_duration) + else { return; + }; + + if result.timed_out() { + if let Some(auto_splitter) = &*shared_state.auto_splitter.load() { + auto_splitter.interrupt_handle().interrupt(); + } } + + new_state } - } + }; } }