Skip to content

Commit

Permalink
Monitor work.
Browse files Browse the repository at this point in the history
  • Loading branch information
dbittman committed Jul 23, 2024
1 parent c317856 commit d661897
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/runtime/monitor/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use twizzler_runtime_api::ObjID;

pub const MONITOR_INSTANCE_ID: ObjID = ObjID::new(0);
2 changes: 2 additions & 0 deletions src/runtime/monitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![feature(thread_local)]
#![feature(c_str_literals)]
#![feature(new_uninit)]
#![feature(hash_extract_if)]

use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -29,6 +30,7 @@ mod state;
mod thread;
mod upcall;

mod api;
mod mon;

#[path = "../secapi/gates.rs"]
Expand Down
6 changes: 3 additions & 3 deletions src/runtime/monitor/src/mon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use happylock::RwLock;

use self::space::Unmapper;

mod compartment;
mod space;
mod thread;
pub(crate) mod compartment;
pub(crate) mod space;
pub(crate) mod thread;

pub struct Monitor {
space: RwLock<space::Space>,
Expand Down
3 changes: 2 additions & 1 deletion src/runtime/monitor/src/mon/space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use twizzler_abi::syscall::{sys_object_map, sys_object_unmap, UnmapFlags};
use twizzler_object::Protections;
use twizzler_runtime_api::{MapError, MapFlags, ObjID};

use self::handle::{MapHandle, MapHandleInner};
use self::handle::MapHandleInner;

mod handle;
mod unmapper;

pub use handle::MapHandle;
pub use unmapper::Unmapper;

#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Ord, Eq, Hash)]
Expand Down
166 changes: 165 additions & 1 deletion src/runtime/monitor/src/mon/thread.rs
Original file line number Diff line number Diff line change
@@ -1 +1,165 @@
pub struct ThreadMgr {}
use std::{collections::HashMap, mem::MaybeUninit, sync::Arc};

use dynlink::tls::TlsRegion;
use twizzler_abi::{
object::NULLPAGE_SIZE,
syscall::{sys_spawn, sys_thread_exit, ThreadSyncSleep, UpcallTargetSpawnOption},
thread::ThreadRepr,
upcall::{UpcallFlags, UpcallInfo, UpcallMode, UpcallOptions, UpcallTarget},
};
use twizzler_runtime_api::{ObjID, SpawnError};

use super::space::MapHandle;
use crate::{api::MONITOR_INSTANCE_ID, thread::SUPER_UPCALL_STACK_SIZE};

mod cleaner;

pub struct ThreadMgr {
all: HashMap<ObjID, ManagedThread>,
cleaner: cleaner::ThreadCleaner,
}

impl ThreadMgr {
fn do_remove(&mut self, thread: &ManagedThread) {
todo!()
}

unsafe fn spawn_thread(
start: usize,
super_stack_start: usize,
super_thread_pointer: usize,
arg: usize,
) -> Result<ObjID, SpawnError> {
let upcall_target = UpcallTarget::new(
None,
Some(twz_rt::rr_upcall_entry),
super_stack_start,
SUPER_UPCALL_STACK_SIZE,
super_thread_pointer,
MONITOR_INSTANCE_ID,
[UpcallOptions {
flags: UpcallFlags::empty(),
mode: UpcallMode::CallSuper,
}; UpcallInfo::NR_UPCALLS],
);

sys_spawn(twizzler_abi::syscall::ThreadSpawnArgs {
entry: start,
stack_base: super_stack_start,
stack_size: SUPER_UPCALL_STACK_SIZE,
tls: super_thread_pointer,
arg,
flags: twizzler_abi::syscall::ThreadSpawnFlags::empty(),
vm_context_handle: None,
upcall_target: UpcallTargetSpawnOption::SetTo(upcall_target),
})
.map_err(|_| SpawnError::KernelError)
}

fn do_spawn(start: unsafe extern "C" fn(usize) -> !, arg: usize) -> Result<Self, SpawnError> {
/*
let mut cm = COMPMAN.lock();
let mon_comp = cm.get_monitor_dynlink_compartment();
let super_tls = mon_comp
.build_tls_region(RuntimeThreadControl::default(), |layout| unsafe {
NonNull::new(std::alloc::alloc_zeroed(layout))
})
.map_err(|_| SpawnError::Other)?;
drop(cm);
let super_thread_pointer = super_tls.get_thread_pointer_value();
let super_stack = Box::new_zeroed_slice(SUPER_UPCALL_STACK_SIZE);
// Safety: we are allocating and tracking both the stack and the tls region for greater than
// the lifetime of this thread. The start entry points to our given start function.
let id = unsafe {
Self::spawn_thread(
start as *const () as usize,
super_stack.as_ptr() as usize,
super_thread_pointer,
arg,
)
}?;
// TODO
let repr = crate::mapman::map_object(MapInfo {
id,
flags: MapFlags::READ,
})
.unwrap();
Ok(Self {
id,
super_stack,
super_tls,
repr: ManagedThreadRepr::new(repr),
})
*/
todo!()
}

fn do_start(main: Box<dyn FnOnce()>) -> Result<Self, SpawnError> {
let main_addr = Box::into_raw(Box::new(main)) as usize;
unsafe extern "C" fn managed_thread_entry(main: usize) -> ! {
{
let main = Box::from_raw(main as *mut Box<dyn FnOnce()>);
main();
}

sys_thread_exit(0);
}

Self::do_spawn(managed_thread_entry, main_addr)
}
}

#[allow(dead_code)]
pub struct ManagedThreadInner {
pub id: ObjID,
pub(crate) repr: ManagedThreadRepr,
super_stack: Box<[MaybeUninit<u8>]>,
super_tls: TlsRegion,
}

impl ManagedThreadInner {
pub fn has_exited(&self) -> bool {
todo!()
}

pub fn waitable_until_exit(&self) -> ThreadSyncSleep {
todo!()
}
}

// Safety: TlsRegion is not changed, and points to only globally- and permanently-allocated data.
unsafe impl Send for ManagedThreadInner {}
unsafe impl Sync for ManagedThreadInner {}

impl core::fmt::Debug for ManagedThreadInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ManagedThread({})", self.id)
}
}

impl Drop for ManagedThreadInner {
fn drop(&mut self) {
tracing::trace!("dropping ManagedThread {}", self.id);
}
}

pub type ManagedThread = Arc<ManagedThreadInner>;

pub(crate) struct ManagedThreadRepr {
handle: MapHandle,
}

impl ManagedThreadRepr {
fn new(handle: MapHandle) -> Self {
Self { handle }
}

pub fn get_repr(&self) -> &ThreadRepr {
let addr = self.handle.addrs().start + NULLPAGE_SIZE;
unsafe { (addr as *const ThreadRepr).as_ref().unwrap() }
}
}
148 changes: 148 additions & 0 deletions src/runtime/monitor/src/mon/thread/cleaner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use std::{
collections::HashMap,
marker::PhantomPinned,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
mpsc::{Receiver, Sender},
Arc,
},
};

use twizzler_abi::syscall::{
sys_thread_sync, ThreadSync, ThreadSyncFlags, ThreadSyncOp, ThreadSyncReference,
ThreadSyncSleep, ThreadSyncWake,
};
use twizzler_runtime_api::ObjID;

use super::ManagedThread;
use crate::mon::get_monitor;

pub(super) struct ThreadCleaner {
thread: std::thread::JoinHandle<()>,
send: Sender<WaitOp>,
inner: Pin<Arc<ThreadCleanerData>>,
}

#[derive(Default)]
struct ThreadCleanerData {
notify: AtomicU64,
_unpin: PhantomPinned,
}

#[derive(Default)]
struct Waits {
threads: HashMap<ObjID, ManagedThread>,
}

// Changes to the collection of threads we are tracking
enum WaitOp {
Add(ManagedThread),
Remove(ObjID),
}

impl ThreadCleaner {
pub(super) fn new() -> Self {
let (send, recv) = std::sync::mpsc::channel();
let data = Arc::pin(ThreadCleanerData::default());
let inner = data.clone();
let thread = std::thread::Builder::new()
.name("thread-exit cleanup tracker".into())
.spawn(move || cleaner_thread_main(data, recv))
.unwrap();
Self {
send,
inner,
thread,
}
}

/// Track a thread. If that thread exits, the cleanup thread will remove the exited thread from
/// tracking and from the global thread manager.
pub fn track(&self, th: ManagedThread) {
tracing::debug!("tracking thread {}", th.id);
let _ = self.send.send(WaitOp::Add(th));
self.inner.notify();
}

/// Untrack a thread. Threads removed this way do not trigger a removal from the global thread
/// manager.
pub fn untrack(&self, id: ObjID) {
let _ = self.send.send(WaitOp::Remove(id));
self.inner.notify();
}
}

impl ThreadCleanerData {
/// Notify the cleanup thread that new items are on the queue.
fn notify(&self) {
self.notify.store(1, Ordering::SeqCst);
let mut ops = [ThreadSync::new_wake(ThreadSyncWake::new(
ThreadSyncReference::Virtual(&self.notify),
1,
))];
if let Err(e) = sys_thread_sync(&mut ops, None) {
tracing::warn!("thread sync error when trying to notify: {}", e);
}
}
}

impl Waits {
fn process_queue(&mut self, recv: &mut Receiver<WaitOp>) {
while let Ok(wo) = recv.try_recv() {
match wo {
WaitOp::Add(th) => {
self.threads.insert(th.id, th);
}
WaitOp::Remove(id) => {
self.threads.remove(&id);
}
}
}
}
}

#[tracing::instrument(skip(data, recv))]
fn cleaner_thread_main(data: Pin<Arc<ThreadCleanerData>>, mut recv: Receiver<WaitOp>) {
// TODO (dbittman): when we have support for async thread events, we can use that API.
let mut ops = Vec::new();
let mut cleanups = Vec::new();
let mut waits = Waits::default();
let mut key = happylock::ThreadKey::get().unwrap();
loop {
ops.truncate(0);
// Apply any waiting operations.
waits.process_queue(&mut recv);

// Add the notify sleep op.
ops.push(ThreadSync::new_sleep(ThreadSyncSleep::new(
ThreadSyncReference::Virtual(&data.notify),
0,
ThreadSyncOp::Equal,
ThreadSyncFlags::empty(),
)));

// Add all sleep ops for threads.
cleanups.extend(waits.threads.extract_if(|_, th| th.has_exited()));
for th in waits.threads.values() {
ops.push(ThreadSync::new_sleep(th.waitable_until_exit()));
}

// Remove any exited threads from the thread manager.
for (_, th) in cleanups.drain(..) {
tracing::debug!("cleaning thread: {}", th.id);
let monitor = get_monitor();
let mut tmgr = monitor.thread_mgr.write(&mut key);
tmgr.do_remove(&th);
}

// Check for notifications, and sleep.
if data.notify.swap(0, Ordering::SeqCst) == 0 {
// no notification, go to sleep. hold the lock over the sleep so that someone cannot
// modify waits.threads on us while we're asleep.
if let Err(e) = sys_thread_sync(&mut ops, None) {
tracing::warn!("thread sync error: {}", e);
}
}
}
}

0 comments on commit d661897

Please sign in to comment.