diff --git a/Cargo.lock b/Cargo.lock index 51dd7bc7..5026db3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3843,11 +3843,13 @@ dependencies = [ name = "pager" version = "0.1.0" dependencies = [ + "async-executor", + "async-io", "async-trait", + "futures", "nvme", "tickv", "twizzler-abi", - "twizzler-async", "twizzler-driver", "twizzler-object", "twizzler-queue", @@ -5335,7 +5337,6 @@ dependencies = [ "lazy_static", "limine", "linked_list_allocator", - "log", "memoffset", "nonoverlapping_interval_tree", "object 0.32.2", @@ -5347,6 +5348,7 @@ dependencies = [ "sha2", "slabmalloc", "smccc", + "stable-vec", "tar-no-std", "tock-registers", "twizzler-abi", @@ -5397,8 +5399,10 @@ dependencies = [ name = "twizzler-queue" version = "0.1.0" dependencies = [ + "async-io", + "futures", "twizzler-abi", - "twizzler-async", + "twizzler-futures", "twizzler-object", "twizzler-queue-raw", ] diff --git a/src/bin/netmgr/src/client_request.rs b/src/bin/netmgr/src/client_request.rs index a50b9fca..ceafc9e2 100644 --- a/src/bin/netmgr/src/client_request.rs +++ b/src/bin/netmgr/src/client_request.rs @@ -13,7 +13,7 @@ pub async fn handle_client_request( handle: &HandleRef, id: u32, request: TxRequest, -) -> Result<(), QueueError> { +) -> Result<(), std::io::Error> { println!("got txreq {:?}", request); let reply = match request { TxRequest::Echo(incoming_data) => { diff --git a/src/bin/pager/Cargo.toml b/src/bin/pager/Cargo.toml index a57eed86..c443bf5c 100644 --- a/src/bin/pager/Cargo.toml +++ b/src/bin/pager/Cargo.toml @@ -9,10 +9,13 @@ edition = "2021" twizzler-abi = { path = "../../lib/twizzler-abi" } twizzler-object = { path = "../../lib/twizzler-object" } twizzler-queue = { path = "../../lib/twizzler-queue" } -twizzler-async = { path = "../../lib/twizzler-async" } +#twizzler-async = { path = "../../lib/twizzler-async" } twizzler-driver = { path = "../../lib/twizzler-driver" } twizzler-runtime = { path = "../../rt" } nvme = { path = "../../lib/nvme-rs" } tickv = { version = "1.0.0" } async-trait = "0.1.66" volatile = "0.5" +async-io = "*" +async-executor = "*" +futures = "*" diff --git a/src/bin/pager/src/main.rs b/src/bin/pager/src/main.rs index 17fab74c..0e106a07 100644 --- a/src/bin/pager/src/main.rs +++ b/src/bin/pager/src/main.rs @@ -1,5 +1,8 @@ -use std::{collections::BTreeMap, time::Duration}; +use std::{collections::BTreeMap, sync::OnceLock, time::Duration}; +use async_executor::{Executor, Task}; +use async_io::Timer; +use futures::executor::block_on; use tickv::{success_codes::SuccessCode, ErrorCode}; use twizzler_abi::pager::{ CompletionToKernel, CompletionToPager, KernelCompletionData, RequestFromKernel, @@ -23,6 +26,8 @@ struct Foo { } extern crate twizzler_runtime; +pub static EXECUTOR: OnceLock = OnceLock::new(); + fn main() { let idstr = std::env::args().nth(1).unwrap(); let kidstr = std::env::args().nth(2).unwrap(); @@ -52,30 +57,33 @@ fn main() { let kqueue = twizzler_queue::Queue::::from(kobject); let sq = twizzler_queue::QueueSender::new(kqueue); + let ex = EXECUTOR.get_or_init(|| Executor::new()); + let num_threads = 2; for _ in 0..(num_threads - 1) { - std::thread::spawn(|| twizzler_async::run(std::future::pending::<()>())); + std::thread::spawn(|| block_on(ex.run(std::future::pending::<()>()))); } - twizzler_async::Task::spawn(async move { + ex.spawn(async move { loop { - let timeout = twizzler_async::Timer::after(Duration::from_millis(1000)); - println!("pager submitting request"); + let timeout = Timer::after(Duration::from_millis(1000)); + println!(" pager: submitting request"); let res = sq.submit_and_wait(RequestFromPager::new( twizzler_abi::pager::PagerRequest::EchoReq, )); let x = res.await; - println!("pager got {:?} in response", x); + println!(" pager: got {:?} in response", x); timeout.await; - break; + // TODO: do some other stuff? + std::future::pending::<()>().await; } }) .detach(); - twizzler_async::Task::spawn(async move { + ex.spawn(async move { loop { let (id, request) = rq.receive().await.unwrap(); - println!("got req from kernel: {} {:?}", id, request); + println!(" pager: got req from kernel: {} {:?}", id, request); let reply = handle_request(request).await; if let Some(reply) = reply { rq.complete(id, reply).await.unwrap(); @@ -83,7 +91,7 @@ fn main() { } }) .detach(); - twizzler_async::run(std::future::pending::<()>()); + block_on(ex.run(std::future::pending::<()>())); } static mut RAND_STATE: u32 = 0; diff --git a/src/bin/pager/src/nvme/controller.rs b/src/bin/pager/src/nvme/controller.rs index 517c9a3c..27031493 100644 --- a/src/bin/pager/src/nvme/controller.rs +++ b/src/bin/pager/src/nvme/controller.rs @@ -3,6 +3,7 @@ use std::{ sync::{Arc, Mutex, OnceLock, RwLock}, }; +use async_executor::Task; use nvme::{ admin::{CreateIOCompletionQueue, CreateIOSubmissionQueue}, ds::{ @@ -16,7 +17,6 @@ use nvme::{ hosted::memory::{PhysicalPageCollection, PrpMode}, nvm::{ReadDword13, WriteDword13}, }; -use twizzler_async::Task; use twizzler_driver::{ dma::{DmaOptions, DmaPool, DMA_PAGE_SIZE}, request::{Requester, SubmitRequest, SubmitSummaryWithResponses}, @@ -144,7 +144,7 @@ pub async fn init_controller(ctrl: &mut Arc) { let req2 = req.clone(); let ctrl2 = ctrl.clone(); - let task = Task::spawn(async move { + let task = super::super::EXECUTOR.get().unwrap().spawn(async move { loop { let _i = int.next().await; //println!("got interrupt"); diff --git a/src/bin/pager/src/store.rs b/src/bin/pager/src/store.rs index 480c4299..4c393898 100644 --- a/src/bin/pager/src/store.rs +++ b/src/bin/pager/src/store.rs @@ -6,6 +6,7 @@ use std::{ sync::Arc, }; +use futures::executor::block_on; use tickv::{success_codes::SuccessCode, ErrorCode, FlashController}; use twizzler_object::ObjID; @@ -32,7 +33,7 @@ impl FlashController for Storage { offset: usize, buf: &mut [u8; BLOCK_SIZE], ) -> Result<(), tickv::ErrorCode> { - twizzler_async::block_on(self.nvme.read_page(region_number as u64 * 8, buf, offset)) + block_on(self.nvme.read_page(region_number as u64 * 8, buf, offset)) .map_err(|_| tickv::ErrorCode::ReadFail) } @@ -42,7 +43,7 @@ impl FlashController for Storage { let start = (address / BLOCK_SIZE) * SECTORS_TO_BLOCK; let thislen = min(BLOCK_SIZE - offset, buf.len()); - twizzler_async::block_on(self.nvme.write_page(start as u64, &buf[0..thislen], offset)) + block_on(self.nvme.write_page(start as u64, &buf[0..thislen], offset)) .map_err(|_| tickv::ErrorCode::WriteFail)?; buf = &buf[thislen..buf.len()]; @@ -52,7 +53,7 @@ impl FlashController for Storage { } fn erase_region(&self, region_number: usize) -> Result<(), tickv::ErrorCode> { - twizzler_async::block_on(self.nvme.write_page( + block_on(self.nvme.write_page( (region_number * SECTORS_TO_BLOCK) as u64, &[0xffu8; BLOCK_SIZE], 0, diff --git a/src/kernel/Cargo.toml b/src/kernel/Cargo.toml index 18f4d013..cd7d2e5d 100644 --- a/src/kernel/Cargo.toml +++ b/src/kernel/Cargo.toml @@ -32,6 +32,7 @@ limine = "0.2.0" twizzler-queue-raw = { version = "*", path = "../lib/twizzler-queue-raw", default-features = false } #syscall_encode = { version = "0.1.2" } volatile = "0.5" +stable-vec = { version = "0.4", default-features = false, features = [] } # for crypto p256 = { version = "0.13.2", default-features = false, features = ["ecdsa"] } log = "*" diff --git a/src/kernel/src/condvar.rs b/src/kernel/src/condvar.rs index 293d059e..ade5d530 100644 --- a/src/kernel/src/condvar.rs +++ b/src/kernel/src/condvar.rs @@ -1,3 +1,5 @@ +use alloc::vec::Vec; + use intrusive_collections::{intrusive_adapter, KeyAdapter, RBTree}; use twizzler_abi::{object::ObjID, thread::ExecutionState}; @@ -65,7 +67,13 @@ impl CondVar { pub fn signal(&self) { let mut inner = self.inner.lock(); let mut node = inner.queue.front_mut(); + let mut threads_to_wake = Vec::new(); while let Some(t) = node.remove() { + threads_to_wake.push(t); + } + + drop(inner); + for t in threads_to_wake { schedule_thread(t); } } @@ -96,7 +104,6 @@ mod tests { #[kernel_test] fn test_condvar() { - //logln!("a: {}", crate::interrupt::disable()); let lock = Arc::new(Spinlock::new(0)); let cv = Arc::new(CondVar::new()); let cv2 = cv.clone(); diff --git a/src/kernel/src/idcounter.rs b/src/kernel/src/idcounter.rs index b3794b85..5ec6ead9 100644 --- a/src/kernel/src/idcounter.rs +++ b/src/kernel/src/idcounter.rs @@ -35,6 +35,17 @@ impl SimpleId { } } +impl From for SimpleId { + fn from(value: u32) -> Self { + Self { id: value as u64 } + } +} +impl From for SimpleId { + fn from(value: u64) -> Self { + Self { id: value } + } +} + impl IdCounter { pub const fn new() -> Self { Self { diff --git a/src/kernel/src/log.rs b/src/kernel/src/log.rs index 9520ce89..302e5552 100755 --- a/src/kernel/src/log.rs +++ b/src/kernel/src/log.rs @@ -388,29 +388,3 @@ macro_rules! emerglogln { $crate::emerglog!(concat!($fmt, "\n"), $($arg)*) }; } - -use log::{Level, Metadata, Record}; - -struct SimpleLogger; - -impl log::Log for SimpleLogger { - fn enabled(&self, metadata: &Metadata) -> bool { - metadata.level() <= Level::Info - } - - fn log(&self, record: &Record) { - if self.enabled(record.metadata()) { - logln!("{} - {}", record.level(), record.args()); - } - } - - fn flush(&self) {} -} - -use log::{LevelFilter, SetLoggerError}; - -static LOGGER: SimpleLogger = SimpleLogger; - -pub fn init() { - let _ = log::set_logger(&LOGGER).map(|()| log::set_max_level(LevelFilter::Info)); -} diff --git a/src/kernel/src/machine/pc/serial.rs b/src/kernel/src/machine/pc/serial.rs index c07af4d6..68410a00 100755 --- a/src/kernel/src/machine/pc/serial.rs +++ b/src/kernel/src/machine/pc/serial.rs @@ -64,8 +64,8 @@ impl SerialPort { // Enable DLAB self.write_reg(Self::LINE_CTRL, 0x80); - // Set maximum speed to 38400 bps by configuring DLL and DLM - self.write_reg(Self::DATA, 0x03); + // Set maximum speed to 115200 bps by configuring DLL and DLM + self.write_reg(Self::DATA, 0x01); self.write_reg(Self::INT_EN, 0x00); // Disable DLAB and set data word length to 8 bits @@ -193,6 +193,11 @@ lazy_static! { serial_port.init(); SimpleLock::new(serial_port) }; + static ref SERIAL2: SimpleLock = { + let mut serial_port = unsafe { SerialPort::new(0x2f8) }; + serial_port.init(); + SimpleLock::new(serial_port) + }; } pub fn late_init() { @@ -225,5 +230,8 @@ pub fn write(data: &[u8], _flags: crate::log::KernelConsoleWriteFlags) { let _ = SERIAL1 .lock() .write_str(core::str::from_utf8_unchecked(data)); + let _ = SERIAL2 + .lock() + .write_str(core::str::from_utf8_unchecked(data)); } } diff --git a/src/kernel/src/main.rs b/src/kernel/src/main.rs index a5c49227..da291e38 100755 --- a/src/kernel/src/main.rs +++ b/src/kernel/src/main.rs @@ -82,7 +82,6 @@ pub fn is_test_mode() -> bool { fn kernel_main(boot_info: &mut B) -> ! { arch::init(boot_info); - log::init(); logln!("[kernel] boot with cmd `{}'", boot_info.get_cmd_line()); let cmdline = boot_info.get_cmd_line(); for opt in cmdline.split(" ") { diff --git a/src/kernel/src/pager.rs b/src/kernel/src/pager.rs index 4999ea8b..55dfacc9 100644 --- a/src/kernel/src/pager.rs +++ b/src/kernel/src/pager.rs @@ -1,3 +1,5 @@ +use inflight::InflightManager; +use request::ReqKind; use twizzler_abi::{ object::ObjID, pager::{CompletionToKernel, CompletionToPager, RequestFromKernel, RequestFromPager}, @@ -5,90 +7,72 @@ use twizzler_abi::{ }; use crate::{ - obj::{lookup_object, LookupFlags}, + mutex::Mutex, + obj::{lookup_object, LookupFlags, ObjectRef}, + once::Once, queue::{ManagedQueueReceiver, ManagedQueueSender, QueueObject}, sched::schedule, + syscall::sync::finish_blocking, thread::{current_thread_ref, entry::start_new_kernel, priority::Priority}, }; -struct PagerQueues { - sender: Option>, - receiver: Option>, -} +mod inflight; +mod queues; +mod request; -static mut PAGER_QUEUES: PagerQueues = PagerQueues { - sender: None, - receiver: None, -}; +pub use inflight::Inflight; +pub use queues::init_pager_queue; +pub use request::Request; +/* extern "C" fn pager_entry() { pager_main(); } - -extern "C" fn pager_compl_handler_entry() { - pager_compl_handler_main(); -} - -extern "C" fn pager_request_handler_entry() { - pager_request_handler_main(); -} - -fn pager_request_handler_main() { - let receiver = unsafe { PAGER_QUEUES.receiver.as_ref().unwrap() }; - loop { - receiver.handle_request(|id, req| { - logln!("kernel got req {}:{:?} from pager", id, req); - CompletionToPager::new(twizzler_abi::pager::PagerCompletionData::EchoResp) - }); - } -} - -fn pager_compl_handler_main() { - let sender = unsafe { PAGER_QUEUES.sender.as_ref().unwrap() }; - loop { - sender.process_completion(); - } -} - fn pager_main() { - logln!("hello from pager thread"); + logln!("kernel: hello from pager thread"); let sender = unsafe { PAGER_QUEUES.sender.as_ref().unwrap() }; loop { let out = sender.submit(RequestFromKernel::new( twizzler_abi::pager::KernelCommand::EchoReq, )); - logln!("submitted request"); + logln!("kernel: submitted request"); let resp = out.wait(); logln!("got response: {:?}", resp); current_thread_ref() .unwrap() .set_state(ExecutionState::Sleeping); + logln!("kernel: got response: {:?}", resp); + // TODO: enter normal pager operation... schedule(false); } } +*/ -pub fn init_pager_queue(id: ObjID, outgoing: bool) { - let obj = match lookup_object(id, LookupFlags::empty()) { - crate::obj::LookupResult::Found(o) => o, - _ => panic!("pager queue not found"), - }; - logln!( - "[kernel-pager] registered {} pager queue: {}", - if outgoing { "sender" } else { "receiver" }, - id - ); - if outgoing { - let queue = QueueObject::::from_object(obj); - let sender = ManagedQueueSender::new(queue); - unsafe { PAGER_QUEUES.sender = Some(sender) }; - } else { - let queue = QueueObject::::from_object(obj); - let receiver = ManagedQueueReceiver::new(queue); - unsafe { PAGER_QUEUES.receiver = Some(receiver) }; - } - if unsafe { PAGER_QUEUES.receiver.is_some() && PAGER_QUEUES.sender.is_some() } { - start_new_kernel(Priority::REALTIME, pager_entry, 0); - start_new_kernel(Priority::REALTIME, pager_compl_handler_entry, 0); - start_new_kernel(Priority::REALTIME, pager_request_handler_entry, 0); +lazy_static::lazy_static! { + static ref INFLIGHT_MGR: Mutex = Mutex::new(InflightManager::new()); +} + +pub fn lookup_object_and_wait(id: ObjID) -> Option { + loop { + logln!("trying to lookup info about object {}", id); + + match crate::obj::lookup_object(id, LookupFlags::empty()) { + crate::obj::LookupResult::Found(arc) => return Some(arc), + _ => {} + } + + let mut mgr = INFLIGHT_MGR.lock(); + let inflight = mgr.add_request(ReqKind::new_info(id)); + drop(mgr); + if let Some(pager_req) = inflight.pager_req() { + queues::submit_pager_request(pager_req); + } + + let mut mgr = INFLIGHT_MGR.lock(); + let thread = current_thread_ref().unwrap(); + if let Some(guard) = mgr.setup_wait(&inflight, &thread) { + drop(mgr); + finish_blocking(guard); + }; } } diff --git a/src/kernel/src/pager/inflight.rs b/src/kernel/src/pager/inflight.rs new file mode 100644 index 00000000..2dcd4046 --- /dev/null +++ b/src/kernel/src/pager/inflight.rs @@ -0,0 +1,129 @@ +use alloc::collections::{btree_map::BTreeMap, btree_set::BTreeSet}; + +use stable_vec::StableVec; +use twizzler_abi::{object::ObjID, pager::RequestFromKernel}; + +use super::{request::ReqKind, Request}; +use crate::thread::{CriticalGuard, ThreadRef}; + +pub struct Inflight { + id: usize, + rk: ReqKind, +} + +impl Inflight { + pub(super) fn new(id: usize, rk: ReqKind) -> Self { + Self { id, rk } + } + + pub(super) fn pager_req(&self) -> Option { + todo!() + } +} + +#[derive(Default)] +struct PerObjectData { + page_map: BTreeMap>, + info_list: BTreeSet, +} + +impl PerObjectData { + fn insert(&mut self, rk: ReqKind, id: usize) { + for page in rk.pages() { + self.page_map.entry(page).or_default().insert(id); + } + if rk.needs_info() { + self.info_list.insert(id); + } + } + + fn remove_all(&mut self, rk: ReqKind, id: usize) { + for page in rk.pages() { + self.page_map.entry(page).or_default().remove(&id); + } + if rk.needs_info() { + self.info_list.remove(&id); + } + } +} + +pub(super) struct InflightManager { + requests: StableVec, + req_map: BTreeMap, + per_object: BTreeMap, +} + +impl InflightManager { + pub fn new() -> Self { + Self { + requests: StableVec::new(), + req_map: BTreeMap::new(), + per_object: BTreeMap::new(), + } + } + + pub fn add_request(&mut self, rk: ReqKind) -> Inflight { + if let Some(id) = self.req_map.get(&rk) { + return Inflight::new(*id, rk); + } + let id = self.requests.next_push_index(); + let request = Request::new(id, rk); + self.requests.push(request); + self.req_map.insert(rk, id); + let per_obj = self + .per_object + .entry(rk.objid()) + .or_insert_with(|| PerObjectData::default()); + per_obj.insert(rk, id); + Inflight::new(id, rk) + } + + fn remove_request(&mut self, id: usize) { + let Some(request) = self.requests.get(id) else { + return; + }; + self.req_map.remove(&request.reqkind()); + if let Some(po) = self.per_object.get_mut(&request.reqkind().objid()) { + po.remove_all(request.reqkind(), id); + } + } + + pub fn setup_wait<'a>( + &mut self, + inflight: &Inflight, + thread: &'a ThreadRef, + ) -> Option> { + let Some(request) = self.requests.get_mut(inflight.id) else { + return None; + }; + request.setup_wait(thread) + } + + pub fn info_ready(&mut self, objid: ObjID) { + if let Some(po) = self.per_object.get_mut(&objid) { + for id in &po.info_list { + if let Some(req) = self.requests.get_mut(*id) { + req.info_ready(); + } else { + logln!("[pager] warning -- stale ID"); + } + } + } + } + + pub fn pages_ready(&mut self, objid: ObjID, pages: impl IntoIterator) { + if let Some(po) = self.per_object.get_mut(&objid) { + for page in pages { + if let Some(idset) = po.page_map.get(&page) { + for id in idset { + if let Some(req) = self.requests.get_mut(*id) { + req.page_ready(page); + } else { + logln!("[pager] warning -- stale ID"); + } + } + } + } + } + } +} diff --git a/src/kernel/src/pager/queues.rs b/src/kernel/src/pager/queues.rs new file mode 100644 index 00000000..098e410c --- /dev/null +++ b/src/kernel/src/pager/queues.rs @@ -0,0 +1,82 @@ +use twizzler_abi::{ + object::ObjID, + pager::{ + CompletionToKernel, CompletionToPager, KernelCommand, RequestFromKernel, RequestFromPager, + }, +}; + +use super::request::ReqKind; +use crate::{ + idcounter::{IdCounter, SimpleId}, + obj::{lookup_object, LookupFlags}, + once::Once, + queue::{ManagedQueueReceiver, ManagedQueueSender, QueueObject}, + thread::{entry::start_new_kernel, priority::Priority}, +}; + +static SENDER: Once<( + IdCounter, + QueueObject, +)> = Once::new(); +static RECEIVER: Once> = Once::new(); + +pub(super) fn pager_request_handler_main() { + let receiver = RECEIVER.wait(); + loop { + receiver.handle_request(|id, req| { + logln!("kernel: got req {}:{:?} from pager", id, req); + CompletionToPager::new(twizzler_abi::pager::PagerCompletionData::EchoResp) + }); + } +} + +pub(super) fn pager_compl_handler_main() { + let sender = SENDER.wait(); + loop { + let completion = sender.1.recv_completion(); + match completion.1.data() { + twizzler_abi::pager::KernelCompletionData::EchoResp => { + logln!("got echo response"); + } + } + sender.0.release_simple(SimpleId::from(completion.0)); + } +} + +pub fn submit_pager_request(item: RequestFromKernel) { + let sender = SENDER.wait(); + let id = sender.0.next_simple().value() as u32; + SENDER.wait().1.submit(item, id); +} + +extern "C" fn pager_compl_handler_entry() { + pager_compl_handler_main(); +} + +extern "C" fn pager_request_handler_entry() { + pager_request_handler_main(); +} + +pub fn init_pager_queue(id: ObjID, outgoing: bool) { + let obj = match lookup_object(id, LookupFlags::empty()) { + crate::obj::LookupResult::Found(o) => o, + _ => panic!("pager queue not found"), + }; + logln!( + "[kernel-pager] registered {} pager queue: {}", + if outgoing { "sender" } else { "receiver" }, + id + ); + if outgoing { + let queue = QueueObject::::from_object(obj); + SENDER.call_once(|| (IdCounter::new(), queue)); + } else { + let queue = QueueObject::::from_object(obj); + let receiver = ManagedQueueReceiver::new(queue); + RECEIVER.call_once(|| receiver); + } + if SENDER.poll().is_some() && RECEIVER.poll().is_some() { + start_new_kernel(Priority::default_user(), pager_compl_handler_entry, 0); + start_new_kernel(Priority::default_user(), pager_request_handler_entry, 0); + } +} diff --git a/src/kernel/src/pager/request.rs b/src/kernel/src/pager/request.rs new file mode 100644 index 00000000..e71f6179 --- /dev/null +++ b/src/kernel/src/pager/request.rs @@ -0,0 +1,101 @@ +use alloc::{collections::btree_set::BTreeSet, vec::Vec}; + +use twizzler_abi::object::ObjID; + +use crate::{ + sched::schedule_thread, + thread::{CriticalGuard, ThreadRef}, +}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub(super) enum ReqKind { + Info(ObjID), + PageData(ObjID, usize, usize), +} + +impl ReqKind { + pub fn new_info(obj_id: ObjID) -> Self { + ReqKind::Info(obj_id) + } + + pub fn new_page_data(obj_id: ObjID, start: usize, len: usize) -> Self { + ReqKind::PageData(obj_id, start, len) + } + + pub fn pages(&self) -> impl Iterator { + match self { + ReqKind::Info(_) => (0..0).into_iter(), + ReqKind::PageData(_, start, len) => (*start..(*start + *len)).into_iter(), + } + } + + pub fn needs_info(&self) -> bool { + matches!(self, ReqKind::Info(_)) + } + + pub fn objid(&self) -> ObjID { + match self { + ReqKind::Info(obj_id) => *obj_id, + ReqKind::PageData(obj_id, _, _) => *obj_id, + } + } +} + +pub struct Request { + id: usize, + reqkind: ReqKind, + remaining_pages: BTreeSet, + info_ready: Option, + waiting_threads: Vec, +} + +impl Request { + pub fn new(id: usize, reqkind: ReqKind) -> Self { + let mut remaining_pages = BTreeSet::new(); + for page in reqkind.pages() { + remaining_pages.insert(page); + } + Self { + id, + reqkind, + info_ready: if reqkind.needs_info() { + Some(false) + } else { + None + }, + waiting_threads: Vec::new(), + remaining_pages, + } + } + + pub fn reqkind(&self) -> ReqKind { + self.reqkind + } + + pub fn done(&self) -> bool { + self.info_ready.unwrap_or(true) && self.remaining_pages.is_empty() + } + + pub fn signal(&mut self) { + for thread in self.waiting_threads.drain(..) { + schedule_thread(thread); + } + } + + pub fn info_ready(&mut self) { + self.info_ready.as_mut().map(|b| *b = true); + } + + pub fn page_ready(&mut self, page: usize) { + self.remaining_pages.remove(&page); + } + + pub fn setup_wait<'a>(&mut self, thread: &'a ThreadRef) -> Option> { + if self.done() { + return None; + } + let critical = thread.enter_critical(); + self.waiting_threads.push(thread.clone()); + Some(critical) + } +} diff --git a/src/kernel/src/queue.rs b/src/kernel/src/queue.rs index c797443b..9eba8573 100644 --- a/src/kernel/src/queue.rs +++ b/src/kernel/src/queue.rs @@ -154,6 +154,7 @@ impl QueueObject { } pub fn complete(&self, item: C, info: u32) { + logln!("kernel: completing!! {}", info); self.completions.send(item, info) } diff --git a/src/kernel/src/syscall/sync.rs b/src/kernel/src/syscall/sync.rs index bbc739f8..52225410 100644 --- a/src/kernel/src/syscall/sync.rs +++ b/src/kernel/src/syscall/sync.rs @@ -53,7 +53,7 @@ pub fn remove_from_requeue(thread: &ThreadRef) { // TODO: this is gross, we're manually trading out a critical guard with an interrupt guard because // we don't want to get interrupted... we need a better way to do this kind of consumable "don't // schedule until I say so". -fn finish_blocking(guard: CriticalGuard) { +pub fn finish_blocking(guard: CriticalGuard) { let thread = current_thread_ref().unwrap(); crate::interrupt::with_disabled(|| { drop(guard); diff --git a/src/lib/twizzler-net/src/nm_handle.rs b/src/lib/twizzler-net/src/nm_handle.rs index 0688d7ce..a9957f8a 100644 --- a/src/lib/twizzler-net/src/nm_handle.rs +++ b/src/lib/twizzler-net/src/nm_handle.rs @@ -56,7 +56,7 @@ pub struct NmHandleManager { } impl NmHandle { - pub async fn handle<'a, F, Fut>(self: &'a Arc, f: F) -> Result<(), QueueError> + pub async fn handle<'a, F, Fut>(self: &'a Arc, f: F) -> Result<(), std::io::Error> where F: Fn(&'a Arc, u32, RxRequest) -> Fut, Fut: Future, @@ -64,7 +64,7 @@ impl NmHandle { self.handler.handle(move |id, req| f(self, id, req)).await } - pub async fn submit(&self, req: TxRequest) -> Result { + pub async fn submit(&self, req: TxRequest) -> Result { self.sender.submit_and_wait(req).await } @@ -125,21 +125,21 @@ impl NmHandleManager { &self.data } - pub async fn receive(&self) -> Result<(u32, TxRequest), QueueError> { + pub async fn receive(&self) -> Result<(u32, TxRequest), std::io::Error> { if self.is_terminated() { - Err(QueueError::Unknown) + Err(QueueError::Unknown.into()) } else { self.handler.receive().await } } - pub async fn complete(&self, id: u32, reply: TxCompletion) -> Result<(), QueueError> { + pub async fn complete(&self, id: u32, reply: TxCompletion) -> Result<(), std::io::Error> { self.handler.complete(id, reply).await } - pub async fn submit(&self, req: RxRequest) -> Result { + pub async fn submit(&self, req: RxRequest) -> Result { if self.is_terminated() { - return Err(QueueError::Unknown); + return Err(QueueError::Unknown.into()); } self.sender.submit_and_wait(req).await } diff --git a/src/lib/twizzler-queue-raw/src/lib.rs b/src/lib/twizzler-queue-raw/src/lib.rs index b13b323b..1a5f91ef 100644 --- a/src/lib/twizzler-queue-raw/src/lib.rs +++ b/src/lib/twizzler-queue-raw/src/lib.rs @@ -77,6 +77,7 @@ use core::{ cell::UnsafeCell, + fmt::Display, marker::PhantomData, sync::atomic::{AtomicU32, AtomicU64, Ordering}, }; @@ -329,15 +330,20 @@ impl RawQueueHdr { fn setup_rec_sleep_simple(&self) -> (&AtomicU64, u64) { // TODO: an interface that undoes this. self.consumer_set_waiting(true); - let b = self.bell.load(Ordering::SeqCst); - (&self.bell, b) + let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff; + (&self.bell, t) } fn setup_send_sleep_simple(&self) -> (&AtomicU64, u64) { // TODO: an interface that undoes this. self.submitter_waiting(); - let t = self.tail.load(Ordering::SeqCst); - (&self.tail, t) + let t = self.tail.load(Ordering::SeqCst) & 0x7fffffff; + let h = self.head.load(Ordering::SeqCst) & 0x7fffffff; + if self.is_full(h, t) { + (&self.tail, t) + } else { + (&self.tail, u64::MAX) + } } fn setup_rec_sleep<'a, T>( @@ -413,6 +419,27 @@ pub enum QueueError { WouldBlock, } +impl Display for QueueError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Unknown => write!(f, "unknown"), + Self::WouldBlock => write!(f, "would block"), + } + } +} + +impl core::error::Error for QueueError {} + +#[cfg(feature = "std")] +impl From for std::io::Error { + fn from(err: QueueError) -> Self { + match err { + QueueError::WouldBlock => std::io::Error::from(std::io::ErrorKind::WouldBlock), + _ => std::io::Error::from(std::io::ErrorKind::Other), + } + } +} + impl RawQueue { /// Construct a new raw queue out of a header reference and a buffer pointer. /// # Safety diff --git a/src/lib/twizzler-queue/Cargo.toml b/src/lib/twizzler-queue/Cargo.toml index 824b4c9d..11ae7197 100644 --- a/src/lib/twizzler-queue/Cargo.toml +++ b/src/lib/twizzler-queue/Cargo.toml @@ -8,6 +8,8 @@ authors = ["Daniel Bittman "] [dependencies] twizzler-queue-raw = { path = "../twizzler-queue-raw" } -twizzler-async = { path = "../twizzler-async" } +twizzler-futures = { path = "../twizzler-futures" } twizzler-abi = { path = "../twizzler-abi" } twizzler-object = { path = "../twizzler-object" } +async-io = "*" +futures = "*" diff --git a/src/lib/twizzler-queue/src/callback_queue.rs b/src/lib/twizzler-queue/src/callback_queue.rs index d959115b..710d1adf 100644 --- a/src/lib/twizzler-queue/src/callback_queue.rs +++ b/src/lib/twizzler-queue/src/callback_queue.rs @@ -1,7 +1,7 @@ -use std::future::Future; +use std::{future::Future, pin::Pin}; -use twizzler_async::{AsyncDuplex, AsyncDuplexSetup}; -use twizzler_queue_raw::{QueueError, ReceiveFlags, SubmissionFlags}; +use async_io::Async; +use twizzler_queue_raw::{ReceiveFlags, SubmissionFlags}; use crate::Queue; @@ -11,9 +11,22 @@ struct CallbackQueueReceiverInner { /// A receiver-side async-enabled queue abstraction. pub struct CallbackQueueReceiver { - inner: AsyncDuplex>, + inner: Async>>>, } +impl twizzler_futures::TwizzlerWaitable + for CallbackQueueReceiverInner +{ + fn wait_item_read(&self) -> twizzler_abi::syscall::ThreadSyncSleep { + self.queue.setup_read_sub_sleep() + } + + fn wait_item_write(&self) -> twizzler_abi::syscall::ThreadSyncSleep { + self.queue.setup_write_com_sleep() + } +} + +/* impl AsyncDuplexSetup for CallbackQueueReceiverInner { type ReadError = QueueError; type WriteError = QueueError; @@ -29,43 +42,64 @@ impl AsyncDuplexSetup for CallbackQueueReceiverInner { self.queue.setup_write_com_sleep() } } +*/ -impl CallbackQueueReceiver { +impl CallbackQueueReceiver { /// Create a new CallbackQueueReceiver from a [Queue]. pub fn new(queue: Queue) -> Self { Self { - inner: AsyncDuplex::new(CallbackQueueReceiverInner { queue }), + inner: Async::new(CallbackQueueReceiverInner { queue }).unwrap(), } } /// Handle a request in a closure that returns a completion. - pub async fn handle(&self, f: F) -> Result<(), QueueError> + pub async fn handle(&self, f: F) -> Result<(), std::io::Error> where F: FnOnce(u32, S) -> Fut, Fut: Future, { let (id, item) = self .inner - .read_with(|inner| inner.queue.receive(ReceiveFlags::NON_BLOCK)) + .read_with(|inner| { + inner + .queue + .receive(ReceiveFlags::NON_BLOCK) + .map_err(|e| e.into()) + }) .await?; let reply = f(id, item).await; self.inner - .write_with(|inner| inner.queue.complete(id, reply, SubmissionFlags::NON_BLOCK)) + .write_with(|inner| { + inner + .queue + .complete(id, reply, SubmissionFlags::NON_BLOCK) + .map_err(|e| e.into()) + }) .await?; Ok(()) } /// Receive a request without immediately returning a completion. - pub async fn receive(&self) -> Result<(u32, S), QueueError> { + pub async fn receive(&self) -> Result<(u32, S), std::io::Error> { self.inner - .read_with(|inner| inner.queue.receive(ReceiveFlags::NON_BLOCK)) + .read_with(|inner| { + inner + .queue + .receive(ReceiveFlags::NON_BLOCK) + .map_err(|e| e.into()) + }) .await } /// Send a completion back to the sender. - pub async fn complete(&self, id: u32, reply: C) -> Result<(), QueueError> { + pub async fn complete(&self, id: u32, reply: C) -> Result<(), std::io::Error> { self.inner - .write_with(|inner| inner.queue.complete(id, reply, SubmissionFlags::NON_BLOCK)) + .write_with(|inner| { + inner + .queue + .complete(id, reply, SubmissionFlags::NON_BLOCK) + .map_err(|e| e.into()) + }) .await } } diff --git a/src/lib/twizzler-queue/src/sender_queue.rs b/src/lib/twizzler-queue/src/sender_queue.rs index 724a7d30..6617dd3d 100644 --- a/src/lib/twizzler-queue/src/sender_queue.rs +++ b/src/lib/twizzler-queue/src/sender_queue.rs @@ -1,6 +1,7 @@ use std::{ collections::BTreeMap, future::Future, + pin::Pin, sync::{ atomic::{AtomicU32, Ordering}, Arc, Mutex, @@ -8,7 +9,8 @@ use std::{ task::{Poll, Waker}, }; -use twizzler_async::{AsyncDuplex, AsyncDuplexSetup}; +use async_io::Async; +use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use twizzler_queue_raw::{QueueError, ReceiveFlags, SubmissionFlags}; use crate::Queue; @@ -22,12 +24,12 @@ struct WaitPoint { waker: Option, } -struct WaitPointFuture<'a, S, C> { +struct WaitPointFuture<'a, S: Copy + Send + Sync, C: Copy + Send + Sync> { state: Arc>>, sender: &'a QueueSender, } -impl<'a, S: Copy, C: Copy> Future for WaitPointFuture<'a, S, C> { +impl<'a, S: Copy + Send + Sync, C: Copy + Send + Sync> Future for WaitPointFuture<'a, S, C> { type Output = Result<(u32, C), QueueError>; fn poll( @@ -52,36 +54,30 @@ impl<'a, S: Copy, C: Copy> Future for WaitPointFuture<'a, S, C> { /// completions to indicate that a request has been finished, and that the send ID can be reused. /// /// Thus, this queue interally allocates, sends, and reuses IDs for requests. -pub struct QueueSender { +pub struct QueueSender { counter: AtomicU32, reuse: Mutex>, - inner: AsyncDuplex>, + inner: Async>>>, calls: Mutex>>>>, } -impl AsyncDuplexSetup for QueueSenderInner { - type ReadError = QueueError; - type WriteError = QueueError; - - const READ_WOULD_BLOCK: Self::ReadError = QueueError::WouldBlock; - const WRITE_WOULD_BLOCK: Self::WriteError = QueueError::WouldBlock; - - fn setup_read_sleep(&self) -> twizzler_abi::syscall::ThreadSyncSleep { +impl twizzler_futures::TwizzlerWaitable for QueueSenderInner { + fn wait_item_read(&self) -> twizzler_abi::syscall::ThreadSyncSleep { self.queue.setup_read_com_sleep() } - fn setup_write_sleep(&self) -> twizzler_abi::syscall::ThreadSyncSleep { + fn wait_item_write(&self) -> twizzler_abi::syscall::ThreadSyncSleep { self.queue.setup_write_sub_sleep() } } -impl QueueSender { +impl QueueSender { /// Build a new QueueSender from a [Queue]. pub fn new(queue: Queue) -> Self { Self { counter: AtomicU32::new(0), reuse: Mutex::new(vec![]), - inner: AsyncDuplex::new(QueueSenderInner { queue }), + inner: Async::new(QueueSenderInner { queue }).unwrap(), calls: Mutex::new(BTreeMap::new()), } } @@ -130,7 +126,7 @@ impl QueueSender { } /// Submit an item and await a completion. - pub async fn submit_and_wait(&self, item: S) -> Result { + pub async fn submit_and_wait(&self, item: S) -> Result { let id = self.next_id(); let state = Arc::new(Mutex::new(WaitPoint:: { item: None, @@ -145,25 +141,39 @@ impl QueueSender { self.handle_completion(id, item); } self.inner - .write_with(|inner| inner.queue.submit(id, item, SubmissionFlags::NON_BLOCK)) + .write_with(|inner| { + inner + .queue + .submit(id, item, SubmissionFlags::NON_BLOCK) + .map_err(|e| e.into()) + }) .await?; let waiter = WaitPointFuture:: { state, sender: self, }; - let item = Box::pin(waiter); - let recv = Box::pin(async { + let mut item = Box::pin(async { waiter.await }).fuse(); + let mut recv = Box::pin(async { loop { let (id, item) = self .inner - .read_with(|inner| inner.queue.get_completion(ReceiveFlags::NON_BLOCK)) + .read_with(|inner| { + inner + .queue + .get_completion(ReceiveFlags::NON_BLOCK) + .map_err(|e| e.into()) + }) .await .unwrap(); self.handle_completion(id, item); } - }); - let result = twizzler_async::wait_for_first(item, recv).await?; + }) + .fuse(); + let result = futures::select! { + item_res = item => item_res, + recv_res = recv => recv_res, + }?; self.release_id(id); Ok(result.1) }