Okay, I realized that the whole kind of setup is weird now.
I initially thought I had two ways to signal that a future is done:
- Mark the status flag in the FutureBox
- Send a notification back on the ring buffer
But that seems redundant.
- If I JUST use the status flags, the userspace executor needs some way to awake a task. Otherwise it needs to poll all the idle tasks to see if they are ready. This isn't THAT big of a deal, but it seems a little wasteful
- If I use the ringbuffer, I introduce some amount of delay, and some overhead, because the userspace has to parse the incoming messages
I shouldn't call the wakers from kernel space, because that's user controlled code.
Maybe:
- The kernel uses the buffer
- When it's done, it drops/releases the exclusive handle
- It sends a notification back (future addr? waker ptr?)
Let's look at how the code for this might look.
// Provide a "weak" entry point, that initializes the executor and calls "main".
// This way users can either use the provided one (basically if they are using
// Rust + async/await), or use their own.
//
// I guess this would also initialize the ring buffers and whatever else is expected
// on a normal startup.
//
// In this case, `main` is spawned as the "root task" in the executor
use mstd::alloc::String;
async fn main() -> Result<(), ()> {
// This creates a future that:
// * sends a syscall request to open the port
// * This doesn't really need a heap allocation, unless "everything is a future".
// * Even if it does require a heap allocation for the future, it doesn't need any payload.
// The index will be sent along (as owned data) in the request.
// * The executor waits for a response that this has been done, OR it just polls the future?
// This is still an open question.
//
// I guess either way, when the future completes, it needs to "wake" the main task.
//
// TODO: Should this return a "serial handle"? Should there be an error if the serial port has
// already been opened, or should we allow multiple handles to the same serial port?
let mut serial_1 = mstd::serial::open(1).await?;
// This creates a new String, with no contents. It allocates, so we need to await.
// TODO: Should I make the "default" string/vec type Copy on Write to reduce space
// usage?
let mut data_buf: String = String::with_capacity(32).await;
// Note: We can't re-alloc in the write trait, which is why we need to use "with_capacity"
// TODO: Can this be fixed/worked around later? Probably not unless MnemOS gets the ability
// to run threads as well...
write!(&mut data_buf, "Hello {}, Serial World!\r\n", 123)?;
// This actually has two parts:
//
// 1. `serial_1.send(data_buf).await` creates (and awaits) a future that waits
// for the syscall to be transferred to the kernel
// 2. It returns a future that doesn't complete until the SERIAL SEND is complete
// which will happen at some later time
//
// The latter handle can be dropped, and the send will still happen.
let sent_hdl = serial_1.send(data_buf).await;
// This actually ensures the data was sent successfully
sent_hdl.await?;
}
Jeez.
Lots of stuff going on.
Okay, I'm going to have an executor that is global - or at least the spawner or something is going to be global
#[no_mangle]
unsafe fn entry() -> ! {
// Initialize the ringbuffers. The pointers are located in a fixed memory location
let rb = init_ringbuffers();
// Initialize Heap - This data will either be stored at a fixed memory location, or
// will be the first message received from the ring buffer. Not sure yet.
let heap = init_heap();
// This comes from user code
extern "Rust" {
async fn main() -> Result<(), ()>;
}
// Initialize and take the "worker" non-global half of the executor. Initialize it
// with the "root" task and other singleton bits
let mut exec_hdl = init_executor(
main,
rb,
heap,
);
// This loops interenally until the program ends, I guess?
exec_hdl.run();
// TODO: I probably need some unsafe method for yeeting the current grant
// in bbqueue, and maybe some way of hard-halting the executor.
panic!();
}
I think I'm going to end up having "global" public bits, like the spawner, and private singleton bits that really only exist in the main function above. The singletons are probably fine to be unsafe.
In the example above, I have:
- serial open
- string allocation
- send string to serial port
- wait for string to be sent
Let's mock those out.
// 1 - serial open
mod serial {
struct SerialHdl {
port: u16,
}
async fn open(port: u16) -> Result<SerialHdl, ()> {
// TODO: Probably make some macro-op to handle this, to ensure that we are listening before we
// send, and to "hide" the ID and stuff from the caller. Something like:
// `fn(SysCallReq) -> RbRxFut`
let id = executor::next_id();
let request = SysCallReqPacket {
id,
msg: SysCallReq::Serial(SerialRequest::Open(port))
};
let rx: RbRxFut = ringbuf.register_rx(id).await;
let tx: RbTxFut = ringbuf.send(request);
tx.await;
// end macro-op
match rx.await? {
SysCallResp::Serial(SerialResp::Opened) => Ok(SerialHdl { port }),
_ => Err(())
}
}
}
impl U2K_PROD {
fn grant_or_enqueue(&'static self, msg: &SysCallReqPacket, cx: &mut Context<'static>) -> Option<()> {
match self.grant() {
Some(mut wgr) => {
// We have a write grant. Write to the ring buffer
let used = postcard::to_slice(
&self.req,
&mut wgr,
).unwrap().len();
// While we have a write grant, attempt to wake the next future, if any
self.consumer().and_then(|c| {
if let Some(waiter) = c.next() {
waiter.wake();
}
})
wgr.commit(used);
Some(())
}
None => {
// No write grant. We need to try again later. Push ourselves into the
// queue to be woken when the next grant is available
self.producer().push(cx.waker().clone());
None
}
}
}
}
impl U2K_CONS {
// TODO. Basically We'll have a fixed size array for "awaited packets", with space to store the responses
// Let's say something like eight. This will also have room for the wakers, if any
//
// If a future tries to register and there's no room, we'll stick it in an MpMcQueue, so we can wake it
// when there is room in the "actively waiting" queue.
fn register_rx_or_enqueue(&'static self, id: &MsgId, cx: &mut Context<'static>) -> Option<()>;
fn rx_or_enqueue(&'static self, id: &MsgId, cx: &mut Context<'static>) -> Option<Result<SysCallResp, ()>>;
}
struct RbTxFut {
req: SysCallReqPacket,
}
impl Future for RbTxFut {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match U2K_PROD.grant_or_enqueue(&self.req, cx) {
Some(()) => Poll::Ready(()),
None => Poll::Pending,
}
}
}
struct RegisterRx {
id: MsgId,
}
impl Future for RegisterRx {
type Output = RbRxFut;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match U2K_CONS.register_rx_or_enqueue(&self.id, cx) {
Some(()) => Poll::Ready(RbRxFut { id: self.id }),
None => Poll::Pending,
}
}
}
struct RbRxFut {
id: MsgId,
}
impl Future for RbRxFut {
type Output = Result<SysCallResp, ()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match U2K_CONS.rx_or_enqueue(&self.id, cx) {
Some(resp => Poll::Ready(resp),
None => Poll::Pending,
}
}
}
impl RingBuf {
fn send(req: SysCallRequest) -> RbTxFut {
RbTxFut { req }
}
async fn register_rx(id: MsgId) -> RbRxFut {
let reg = RegisterRx { id };
reg.await
}
}
Main issue: this doesn't handle the case where we have a "zombie waker", which could happen if we had a timeout racing with a receive. This would leave the "registered" slot full forever.
I dunno how to solve this.
One way would be to allow the IO provider to (unsafely) check the completion status of the future/task. This wouldn't work if the task was still running.
Another way would be to have a timeout in the io provider itself, though that means that the provider needs to periodically cull items, and think about timeouts.
I think I can use one-shot channels to make sure the buffer gets released whenever the channel is done.
struct OneShot<T> {
status: AtomicU8,
waker: UnsafeCell<MaybeUnunit<Waker>>,
data: UnsafeCell<MaybeUninit<T>>,
}
impl<T> OneShot {
// waker and data are invalid, ready to use. OneShot has exclusive access
const IDLE: u8 = 0b0000_0000;
// This bit exists in case the waker and data have been taken, but the consumer
// has not yet been dropped, so !WAKER | !DATA doesn't alias with IDLE
const BUSY: u8 = 0b0000_0001;
// waker is valid. Busy. Shared access
const HAVE_WAKER: u8 = 0b0000_0010;
// data is valid. Busy. Shared access
const HAVE_DATA: u8 = 0b0000_0100;
// data or waker may be valid. OneShot has exclusive access
const CONS_DROPPED: u8 = 0b0000_1000;
// Call this regularly to purge any zombie tasks. If someone owns a bunch of OneShots,
// they should cleanup all of them, and use the first available one on every pass
// e.g. iterate until `reserve` succeeds
pub fn cleanup(&'static self) {
let status = self.status.load();
if (status & Self::CONS_DROPPED) != 0 {
// Yes, the consumer was dropped. Release what we have.
if (status & Self::HAVE_WAKER) {
drop(*self.waker.get());
}
if (status & Self::HAVE_DATA) {
drop(*self.data.get());
}
// This clears all bits, including the BUSY bit
self.status.store(Self::IDLE);
}
}
pub fn reserve(&'static self, waker: Waker) -> Result<OneShotConsumer<T>, Waker> {
// Is this slot idle?
self.status.compare_exchange(
Self::IDLE,
(Self::HAVE_WAKER | Self::BUSY),
).map_err(waker)?;
self.waker.write(waker);
Ok(OneShotConsumer { buf: self })
}
pub fn deposit(&'static self, data: T) {
if (self.status.load() & Self::HAVE_WAKER) == 0 {
panic!("Tried to deposit without a waker?");
}
let waker = *self.waker.get();
self.data.write(data);
// Clear the waker bit, and set the data bit
let old = self.status.fetch_xor(Self::HAVE_WAKER | Self::HAVE_DATA);
if old == (Self::HAVE_WAKER | Self::BUSY) {
waker.wake();
} else {
// The task was probably dropped. The data will be dropped
// on the next call to cleanup
drop(waker);
}
}
fn take_data(&'static self) -> Option<T> {
let old = self.status.fetch_and(!Self::HAVE_DATA);
if old == 0 {
None
} else {
Some(*self.data.get())
}
}
unsafe fn drop_cons(&'static self) {
self.status.fetch_or(Self::CONS_DROPPED);
}
}
struct OneShotConsumer<T> {
buf: &'static OneShot<T>,
}
impl<T> Future for OneShotConsumer<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.buf.take_data() {
Some(d) => Poll::Ready(d),
None => {
// TODO: rewake context? Probably not?
Poll::Pending
},
}
}
}
impl<T> Drop for OneShotConsumer<T> {
fn drop(&mut self) {
unsafe {
self.buf.drop_cons();
}
}
}