diff --git a/modules/axhal/src/arch/aarch64/context.rs b/modules/axhal/src/arch/aarch64/context.rs index 67b923364c..69581f1bd3 100644 --- a/modules/axhal/src/arch/aarch64/context.rs +++ b/modules/axhal/src/arch/aarch64/context.rs @@ -85,15 +85,19 @@ impl TaskContext { /// /// It first saves the current task's context from CPU to this place, and then /// restores the next task's context from `next_ctx` to CPU. - pub fn switch_to(&mut self, next_ctx: &Self) { + pub fn switch_to(&mut self, next_ctx: &Self, _current_task_on_cpu_ptr: &mut bool) { #[cfg(feature = "fp_simd")] self.fp_state.switch_to(&next_ctx.fp_state); - unsafe { context_switch(self, next_ctx) } + unsafe { context_switch(self, next_ctx, _current_task_on_cpu_ptr) } } } #[naked] -unsafe extern "C" fn context_switch(_current_task: &mut TaskContext, _next_task: &TaskContext) { +unsafe extern "C" fn context_switch( + _current_task: &mut TaskContext, + _next_task: &TaskContext, + _current_task_on_cpu_ptr: &mut bool, +) { asm!( " // save old context (callee-saved registers) @@ -118,7 +122,7 @@ unsafe extern "C" fn context_switch(_current_task: &mut TaskContext, _next_task: ldp x27, x28, [x1, 10 * 8] ldp x29, x30, [x1, 12 * 8] - ret", + b finish_switch", options(noreturn), ) } diff --git a/modules/axhal/src/arch/riscv/context.rs b/modules/axhal/src/arch/riscv/context.rs index 5f9bf3e8aa..d91cce8033 100644 --- a/modules/axhal/src/arch/riscv/context.rs +++ b/modules/axhal/src/arch/riscv/context.rs @@ -107,7 +107,7 @@ impl TaskContext { /// /// It first saves the current task's context from CPU to this place, and then /// restores the next task's context from `next_ctx` to CPU. - pub fn switch_to(&mut self, next_ctx: &Self) { + pub fn switch_to(&mut self, next_ctx: &Self, current_task_on_cpu_ptr: &mut bool) { #[cfg(feature = "tls")] { self.tp = super::read_thread_pointer(); @@ -115,13 +115,17 @@ impl TaskContext { } unsafe { // TODO: switch FP states - context_switch(self, next_ctx) + context_switch(self, next_ctx, current_task_on_cpu_ptr) } } } #[naked] -unsafe extern "C" fn context_switch(_current_task: &mut TaskContext, _next_task: &TaskContext) { +unsafe extern "C" fn context_switch( + _current_task: &mut TaskContext, + _next_task: &TaskContext, + current_task_on_cpu_ptr: &mut bool, +) { asm!( " // save old context (callee-saved registers) @@ -156,7 +160,7 @@ unsafe extern "C" fn context_switch(_current_task: &mut TaskContext, _next_task: LDR sp, a1, 1 LDR ra, a1, 0 - ret", + j finish_switch", options(noreturn), ) } diff --git a/modules/axhal/src/arch/x86_64/context.rs b/modules/axhal/src/arch/x86_64/context.rs index 4b788dc9c9..6409dd9854 100644 --- a/modules/axhal/src/arch/x86_64/context.rs +++ b/modules/axhal/src/arch/x86_64/context.rs @@ -181,7 +181,7 @@ impl TaskContext { /// /// It first saves the current task's context from CPU to this place, and then /// restores the next task's context from `next_ctx` to CPU. - pub fn switch_to(&mut self, next_ctx: &Self) { + pub fn switch_to(&mut self, next_ctx: &Self, current_task_on_cpu_ptr: &mut bool) { #[cfg(feature = "fp_simd")] { self.ext_state.save(); @@ -192,12 +192,16 @@ impl TaskContext { self.fs_base = super::read_thread_pointer(); unsafe { super::write_thread_pointer(next_ctx.fs_base) }; } - unsafe { context_switch(&mut self.rsp, &next_ctx.rsp) } + unsafe { context_switch(self, next_ctx, current_task_on_cpu_ptr) } } } #[naked] -unsafe extern "C" fn context_switch(_current_stack: &mut u64, _next_stack: &u64) { +unsafe extern "C" fn context_switch( + _current_task: &mut TaskContext, + _next_task: &TaskContext, + current_task_on_cpu_ptr: &mut bool, +) { asm!( " push rbp @@ -206,16 +210,17 @@ unsafe extern "C" fn context_switch(_current_stack: &mut u64, _next_stack: &u64) push r13 push r14 push r15 - mov [rdi], rsp + mov [rdi + {task_sp_ptr}], rsp - mov rsp, [rsi] + mov rsp, [rsi + {task_sp_ptr}] pop r15 pop r14 pop r13 pop r12 pop rbx pop rbp - ret", + jmp finish_switch", + task_sp_ptr = const core::mem::offset_of!(TaskContext, rsp), options(noreturn), ) } diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index fcae8f99ef..47b72d18fa 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -284,11 +284,7 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> { pub fn blocked_resched(&mut self) { let curr = crate::current(); - assert!( - curr.is_blocking(), - "task is not blocking, {:?}", - curr.state() - ); + assert!(curr.is_blocked(), "task is not blocked, {:?}", curr.state()); debug!("task block: {}", curr.id_name()); self.inner.resched(false); @@ -323,7 +319,7 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> { let now = axhal::time::wall_time(); if now < deadline { crate::timers::set_alarm_wakeup(deadline, curr.clone()); - curr.set_state(TaskState::Blocking); + curr.set_state(TaskState::Blocked); self.inner.resched(false); } } @@ -341,10 +337,6 @@ impl AxRunQueue { } } - if prev.is_blocking() { - prev.set_state(TaskState::Blocked); - } - let next = self.scheduler.pick_next_task().unwrap_or_else(|| unsafe { // Safety: IRQs must be disabled at this time. IDLE_TASK.current_ref_raw().get_unchecked().clone() @@ -369,6 +361,9 @@ impl AxRunQueue { #[cfg(feature = "preempt")] next_task.set_preempt_pending(false); next_task.set_state(TaskState::Running); + // Claim the task as running, we do this before switching to it + // such that any running task will have this set. + next_task.set_on_cpu(true); if prev_task.ptr_eq(&next_task) { return; } @@ -377,6 +372,8 @@ impl AxRunQueue { let prev_ctx_ptr = prev_task.ctx_mut_ptr(); let next_ctx_ptr = next_task.ctx_mut_ptr(); + let prev_on_cpu_ptr = prev_task.on_cpu_mut_ptr(); + // The strong reference count of `prev_task` will be decremented by 1, // but won't be dropped until `gc_entry()` is called. assert!(Arc::strong_count(prev_task.as_task_ref()) > 1); @@ -384,7 +381,7 @@ impl AxRunQueue { CurrentTask::set_current(prev_task, next_task); - (*prev_ctx_ptr).switch_to(&*next_ctx_ptr); + (*prev_ctx_ptr).switch_to(&*next_ctx_ptr, &mut *prev_on_cpu_ptr); } } } diff --git a/modules/axtask/src/task.rs b/modules/axtask/src/task.rs index b6190adf86..e4de8c27d3 100644 --- a/modules/axtask/src/task.rs +++ b/modules/axtask/src/task.rs @@ -29,14 +29,11 @@ pub(crate) enum TaskState { Running = 1, /// Task is ready to run on some scheduler's ready queue. Ready = 2, - /// Task is just be blocked and inserted into the wait queue, - /// but still have **NOT finished** its scheduling process. - Blocking = 3, /// Task is blocked (in the wait queue or timer list), /// and it has finished its scheduling process, it can be wake up by `notify()` on any run queue safely. - Blocked = 4, + Blocked = 3, /// Task is exited and waiting for being dropped. - Exited = 5, + Exited = 4, } /// The inner task structure. @@ -55,6 +52,8 @@ pub struct TaskInner { in_wait_queue: AtomicBool, #[cfg(feature = "irq")] in_timer_list: AtomicBool, + /// Used to indicate whether the task is running on a CPU. + on_cpu: AtomicBool, /// Used to protect the task from being unblocked by timer and `notify()` at the same time. /// It is used in `unblock_task()`, which is called by wait queue's `notify()` and timer's callback. @@ -96,9 +95,8 @@ impl From for TaskState { match state { 1 => Self::Running, 2 => Self::Ready, - 3 => Self::Blocking, - 4 => Self::Blocked, - 5 => Self::Exited, + 3 => Self::Blocked, + 4 => Self::Exited, _ => unreachable!(), } } @@ -196,6 +194,7 @@ impl TaskInner { in_wait_queue: AtomicBool::new(false), #[cfg(feature = "irq")] in_timer_list: AtomicBool::new(false), + on_cpu: AtomicBool::new(false), #[cfg(feature = "irq")] unblock_lock: SpinRaw::new(()), #[cfg(feature = "preempt")] @@ -259,11 +258,6 @@ impl TaskInner { matches!(self.state(), TaskState::Blocked) } - #[inline] - pub(crate) fn is_blocking(&self) -> bool { - matches!(self.state(), TaskState::Blocking) - } - #[inline] pub(crate) const fn is_init(&self) -> bool { self.is_init @@ -309,13 +303,21 @@ impl TaskInner { where F: FnMut(), { - // When task's state is Blocking, it has not finished its scheduling process. - if self.is_blocking() { - while self.is_blocking() { + debug!("{} unblocking", self.id_name()); + + // If the owning (remote) CPU is still in the middle of schedule() with + // this task as prev, wait until it's done referencing the task. + // + // Pairs with the `finish_switch()`. + // + // This ensures that tasks getting woken will be fully ordered against + // their previous state and preserve Program Order. + if self.on_cpu() { + while self.on_cpu() { // Wait for the task to finish its scheduling process. core::hint::spin_loop(); } - assert!(self.is_blocked()) + assert!(!self.on_cpu()) } // When irq is enabled, use `unblock_lock` to protect the task from being unblocked by timer and `notify()` at the same time. @@ -381,6 +383,27 @@ impl TaskInner { self.ctx.get_mut() } + /// Returns the raw pointer to the `on_cpu` field. + #[inline] + pub(crate) const fn on_cpu_mut_ptr(&self) -> *mut bool { + self.on_cpu.as_ptr() + } + + /// Sets whether the task is running on a CPU. + pub fn set_on_cpu(&self, on_cpu: bool) { + self.on_cpu.store(on_cpu, Ordering::Release) + } + + /// Returns whether the task is running on a CPU. + /// + /// It is used to protect the task from being moved to a different run queue + /// while it has not finished its scheduling process. + /// The `on_cpu` field is set to `true` when the task is preparing to run on a CPU, + /// and it is set to `false` when the task has finished its scheduling process in `finish_switch`. + pub fn on_cpu(&self) -> bool { + self.on_cpu.load(Ordering::Acquire) + } + /// Returns the top address of the kernel stack. #[inline] pub const fn kernel_stack_top(&self) -> Option { @@ -499,3 +522,17 @@ extern "C" fn task_entry() -> ! { } crate::exit(0); } + +#[no_mangle] +unsafe extern "C" fn finish_switch( + _prev_task: &mut TaskContext, + _next_task: &TaskContext, + _prev_task_on_cpu_ptr: *mut bool, +) { + // This must be the very last reference to @_prev_task from this CPU. After + // `on_cpu` is cleared, the task can be moved to a different CPU. We + // must ensure this doesn't happen until the switch is completely + // finished. + AtomicBool::from_ptr(_prev_task_on_cpu_ptr).store(false, Ordering::Release); + // ret to next task +} diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index d167ce5a7d..b4f746564a 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -74,16 +74,7 @@ impl WaitQueue { #[cfg(feature = "preempt")] assert!(curr.can_preempt(2)); - // We set task state as `Blocking` to clarify that the task is blocked - // but **still NOT** finished its scheduling process. - // - // When another task (generally on another run queue) try to unblock this task, - // * if this task's state is still `Blocking`: - // it needs to wait for this task's state to be changed to `Blocked`, which means it has finished its scheduling process. - // * if this task's state is `Blocked`: - // it means this task is blocked and finished its scheduling process, in another word, it has left current run queue, - // so this task can be scheduled on any run queue. - curr.set_state(TaskState::Blocking); + curr.set_state(TaskState::Blocked); curr.set_in_wait_queue(true); debug!("{} push to wait queue", curr.id_name()); @@ -128,7 +119,7 @@ impl WaitQueue { assert!(curr.can_preempt(2)); wq.push_back(curr.clone()); - curr.set_state(TaskState::Blocking); + curr.set_state(TaskState::Blocked); curr.set_in_wait_queue(true); drop(wq); @@ -196,7 +187,7 @@ impl WaitQueue { assert!(curr.can_preempt(2)); wq.push_back(curr.clone()); - curr.set_state(TaskState::Blocking); + curr.set_state(TaskState::Blocked); curr.set_in_wait_queue(true); drop(wq);