From d6618974ac7790c68bae41c8ea651f034bb8f396 Mon Sep 17 00:00:00 2001 From: Daniel Bittman Date: Mon, 22 Jul 2024 18:16:24 -0700 Subject: [PATCH] Monitor work. --- src/runtime/monitor/src/api.rs | 3 + src/runtime/monitor/src/lib.rs | 2 + src/runtime/monitor/src/mon/mod.rs | 6 +- src/runtime/monitor/src/mon/space.rs | 3 +- src/runtime/monitor/src/mon/thread.rs | 166 +++++++++++++++++- src/runtime/monitor/src/mon/thread/cleaner.rs | 148 ++++++++++++++++ 6 files changed, 323 insertions(+), 5 deletions(-) create mode 100644 src/runtime/monitor/src/api.rs create mode 100644 src/runtime/monitor/src/mon/thread/cleaner.rs diff --git a/src/runtime/monitor/src/api.rs b/src/runtime/monitor/src/api.rs new file mode 100644 index 000000000..e8745ae27 --- /dev/null +++ b/src/runtime/monitor/src/api.rs @@ -0,0 +1,3 @@ +use twizzler_runtime_api::ObjID; + +pub const MONITOR_INSTANCE_ID: ObjID = ObjID::new(0); diff --git a/src/runtime/monitor/src/lib.rs b/src/runtime/monitor/src/lib.rs index 36d398233..7e37e1db0 100644 --- a/src/runtime/monitor/src/lib.rs +++ b/src/runtime/monitor/src/lib.rs @@ -2,6 +2,7 @@ #![feature(thread_local)] #![feature(c_str_literals)] #![feature(new_uninit)] +#![feature(hash_extract_if)] use std::sync::{Arc, Mutex}; @@ -29,6 +30,7 @@ mod state; mod thread; mod upcall; +mod api; mod mon; #[path = "../secapi/gates.rs"] diff --git a/src/runtime/monitor/src/mon/mod.rs b/src/runtime/monitor/src/mon/mod.rs index fb5341f59..335453abf 100644 --- a/src/runtime/monitor/src/mon/mod.rs +++ b/src/runtime/monitor/src/mon/mod.rs @@ -4,9 +4,9 @@ use happylock::RwLock; use self::space::Unmapper; -mod compartment; -mod space; -mod thread; +pub(crate) mod compartment; +pub(crate) mod space; +pub(crate) mod thread; pub struct Monitor { space: RwLock, diff --git a/src/runtime/monitor/src/mon/space.rs b/src/runtime/monitor/src/mon/space.rs index 74f4f8a20..bdb58b2f0 100644 --- a/src/runtime/monitor/src/mon/space.rs +++ b/src/runtime/monitor/src/mon/space.rs @@ -5,11 +5,12 @@ use twizzler_abi::syscall::{sys_object_map, sys_object_unmap, UnmapFlags}; use twizzler_object::Protections; use twizzler_runtime_api::{MapError, MapFlags, ObjID}; -use self::handle::{MapHandle, MapHandleInner}; +use self::handle::MapHandleInner; mod handle; mod unmapper; +pub use handle::MapHandle; pub use unmapper::Unmapper; #[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Ord, Eq, Hash)] diff --git a/src/runtime/monitor/src/mon/thread.rs b/src/runtime/monitor/src/mon/thread.rs index 2e7967aa6..c18c04083 100644 --- a/src/runtime/monitor/src/mon/thread.rs +++ b/src/runtime/monitor/src/mon/thread.rs @@ -1 +1,165 @@ -pub struct ThreadMgr {} +use std::{collections::HashMap, mem::MaybeUninit, sync::Arc}; + +use dynlink::tls::TlsRegion; +use twizzler_abi::{ + object::NULLPAGE_SIZE, + syscall::{sys_spawn, sys_thread_exit, ThreadSyncSleep, UpcallTargetSpawnOption}, + thread::ThreadRepr, + upcall::{UpcallFlags, UpcallInfo, UpcallMode, UpcallOptions, UpcallTarget}, +}; +use twizzler_runtime_api::{ObjID, SpawnError}; + +use super::space::MapHandle; +use crate::{api::MONITOR_INSTANCE_ID, thread::SUPER_UPCALL_STACK_SIZE}; + +mod cleaner; + +pub struct ThreadMgr { + all: HashMap, + cleaner: cleaner::ThreadCleaner, +} + +impl ThreadMgr { + fn do_remove(&mut self, thread: &ManagedThread) { + todo!() + } + + unsafe fn spawn_thread( + start: usize, + super_stack_start: usize, + super_thread_pointer: usize, + arg: usize, + ) -> Result { + let upcall_target = UpcallTarget::new( + None, + Some(twz_rt::rr_upcall_entry), + super_stack_start, + SUPER_UPCALL_STACK_SIZE, + super_thread_pointer, + MONITOR_INSTANCE_ID, + [UpcallOptions { + flags: UpcallFlags::empty(), + mode: UpcallMode::CallSuper, + }; UpcallInfo::NR_UPCALLS], + ); + + sys_spawn(twizzler_abi::syscall::ThreadSpawnArgs { + entry: start, + stack_base: super_stack_start, + stack_size: SUPER_UPCALL_STACK_SIZE, + tls: super_thread_pointer, + arg, + flags: twizzler_abi::syscall::ThreadSpawnFlags::empty(), + vm_context_handle: None, + upcall_target: UpcallTargetSpawnOption::SetTo(upcall_target), + }) + .map_err(|_| SpawnError::KernelError) + } + + fn do_spawn(start: unsafe extern "C" fn(usize) -> !, arg: usize) -> Result { + /* + let mut cm = COMPMAN.lock(); + let mon_comp = cm.get_monitor_dynlink_compartment(); + let super_tls = mon_comp + .build_tls_region(RuntimeThreadControl::default(), |layout| unsafe { + NonNull::new(std::alloc::alloc_zeroed(layout)) + }) + .map_err(|_| SpawnError::Other)?; + drop(cm); + let super_thread_pointer = super_tls.get_thread_pointer_value(); + + let super_stack = Box::new_zeroed_slice(SUPER_UPCALL_STACK_SIZE); + + // Safety: we are allocating and tracking both the stack and the tls region for greater than + // the lifetime of this thread. The start entry points to our given start function. + let id = unsafe { + Self::spawn_thread( + start as *const () as usize, + super_stack.as_ptr() as usize, + super_thread_pointer, + arg, + ) + }?; + + // TODO + let repr = crate::mapman::map_object(MapInfo { + id, + flags: MapFlags::READ, + }) + .unwrap(); + + Ok(Self { + id, + super_stack, + super_tls, + repr: ManagedThreadRepr::new(repr), + }) + */ + todo!() + } + + fn do_start(main: Box) -> Result { + let main_addr = Box::into_raw(Box::new(main)) as usize; + unsafe extern "C" fn managed_thread_entry(main: usize) -> ! { + { + let main = Box::from_raw(main as *mut Box); + main(); + } + + sys_thread_exit(0); + } + + Self::do_spawn(managed_thread_entry, main_addr) + } +} + +#[allow(dead_code)] +pub struct ManagedThreadInner { + pub id: ObjID, + pub(crate) repr: ManagedThreadRepr, + super_stack: Box<[MaybeUninit]>, + super_tls: TlsRegion, +} + +impl ManagedThreadInner { + pub fn has_exited(&self) -> bool { + todo!() + } + + pub fn waitable_until_exit(&self) -> ThreadSyncSleep { + todo!() + } +} + +// Safety: TlsRegion is not changed, and points to only globally- and permanently-allocated data. +unsafe impl Send for ManagedThreadInner {} +unsafe impl Sync for ManagedThreadInner {} + +impl core::fmt::Debug for ManagedThreadInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "ManagedThread({})", self.id) + } +} + +impl Drop for ManagedThreadInner { + fn drop(&mut self) { + tracing::trace!("dropping ManagedThread {}", self.id); + } +} + +pub type ManagedThread = Arc; + +pub(crate) struct ManagedThreadRepr { + handle: MapHandle, +} + +impl ManagedThreadRepr { + fn new(handle: MapHandle) -> Self { + Self { handle } + } + + pub fn get_repr(&self) -> &ThreadRepr { + let addr = self.handle.addrs().start + NULLPAGE_SIZE; + unsafe { (addr as *const ThreadRepr).as_ref().unwrap() } + } +} diff --git a/src/runtime/monitor/src/mon/thread/cleaner.rs b/src/runtime/monitor/src/mon/thread/cleaner.rs new file mode 100644 index 000000000..007ee0a9c --- /dev/null +++ b/src/runtime/monitor/src/mon/thread/cleaner.rs @@ -0,0 +1,148 @@ +use std::{ + collections::HashMap, + marker::PhantomPinned, + pin::Pin, + sync::{ + atomic::{AtomicU64, Ordering}, + mpsc::{Receiver, Sender}, + Arc, + }, +}; + +use twizzler_abi::syscall::{ + sys_thread_sync, ThreadSync, ThreadSyncFlags, ThreadSyncOp, ThreadSyncReference, + ThreadSyncSleep, ThreadSyncWake, +}; +use twizzler_runtime_api::ObjID; + +use super::ManagedThread; +use crate::mon::get_monitor; + +pub(super) struct ThreadCleaner { + thread: std::thread::JoinHandle<()>, + send: Sender, + inner: Pin>, +} + +#[derive(Default)] +struct ThreadCleanerData { + notify: AtomicU64, + _unpin: PhantomPinned, +} + +#[derive(Default)] +struct Waits { + threads: HashMap, +} + +// Changes to the collection of threads we are tracking +enum WaitOp { + Add(ManagedThread), + Remove(ObjID), +} + +impl ThreadCleaner { + pub(super) fn new() -> Self { + let (send, recv) = std::sync::mpsc::channel(); + let data = Arc::pin(ThreadCleanerData::default()); + let inner = data.clone(); + let thread = std::thread::Builder::new() + .name("thread-exit cleanup tracker".into()) + .spawn(move || cleaner_thread_main(data, recv)) + .unwrap(); + Self { + send, + inner, + thread, + } + } + + /// Track a thread. If that thread exits, the cleanup thread will remove the exited thread from + /// tracking and from the global thread manager. + pub fn track(&self, th: ManagedThread) { + tracing::debug!("tracking thread {}", th.id); + let _ = self.send.send(WaitOp::Add(th)); + self.inner.notify(); + } + + /// Untrack a thread. Threads removed this way do not trigger a removal from the global thread + /// manager. + pub fn untrack(&self, id: ObjID) { + let _ = self.send.send(WaitOp::Remove(id)); + self.inner.notify(); + } +} + +impl ThreadCleanerData { + /// Notify the cleanup thread that new items are on the queue. + fn notify(&self) { + self.notify.store(1, Ordering::SeqCst); + let mut ops = [ThreadSync::new_wake(ThreadSyncWake::new( + ThreadSyncReference::Virtual(&self.notify), + 1, + ))]; + if let Err(e) = sys_thread_sync(&mut ops, None) { + tracing::warn!("thread sync error when trying to notify: {}", e); + } + } +} + +impl Waits { + fn process_queue(&mut self, recv: &mut Receiver) { + while let Ok(wo) = recv.try_recv() { + match wo { + WaitOp::Add(th) => { + self.threads.insert(th.id, th); + } + WaitOp::Remove(id) => { + self.threads.remove(&id); + } + } + } + } +} + +#[tracing::instrument(skip(data, recv))] +fn cleaner_thread_main(data: Pin>, mut recv: Receiver) { + // TODO (dbittman): when we have support for async thread events, we can use that API. + let mut ops = Vec::new(); + let mut cleanups = Vec::new(); + let mut waits = Waits::default(); + let mut key = happylock::ThreadKey::get().unwrap(); + loop { + ops.truncate(0); + // Apply any waiting operations. + waits.process_queue(&mut recv); + + // Add the notify sleep op. + ops.push(ThreadSync::new_sleep(ThreadSyncSleep::new( + ThreadSyncReference::Virtual(&data.notify), + 0, + ThreadSyncOp::Equal, + ThreadSyncFlags::empty(), + ))); + + // Add all sleep ops for threads. + cleanups.extend(waits.threads.extract_if(|_, th| th.has_exited())); + for th in waits.threads.values() { + ops.push(ThreadSync::new_sleep(th.waitable_until_exit())); + } + + // Remove any exited threads from the thread manager. + for (_, th) in cleanups.drain(..) { + tracing::debug!("cleaning thread: {}", th.id); + let monitor = get_monitor(); + let mut tmgr = monitor.thread_mgr.write(&mut key); + tmgr.do_remove(&th); + } + + // Check for notifications, and sleep. + if data.notify.swap(0, Ordering::SeqCst) == 0 { + // no notification, go to sleep. hold the lock over the sleep so that someone cannot + // modify waits.threads on us while we're asleep. + if let Err(e) = sys_thread_sync(&mut ops, None) { + tracing::warn!("thread sync error: {}", e); + } + } + } +}