Skip to content

Commit

Permalink
[feat] add on_cpu field in AxTaskInner
Browse files Browse the repository at this point in the history
  • Loading branch information
hky1999 committed Sep 25, 2024
1 parent ae26348 commit 13fd56b
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 54 deletions.
12 changes: 8 additions & 4 deletions modules/axhal/src/arch/aarch64/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,19 @@ impl TaskContext {
///
/// It first saves the current task's context from CPU to this place, and then
/// restores the next task's context from `next_ctx` to CPU.
pub fn switch_to(&mut self, next_ctx: &Self) {
pub fn switch_to(&mut self, next_ctx: &Self, _current_task_on_cpu_ptr: &mut bool) {
#[cfg(feature = "fp_simd")]
self.fp_state.switch_to(&next_ctx.fp_state);
unsafe { context_switch(self, next_ctx) }
unsafe { context_switch(self, next_ctx, _current_task_on_cpu_ptr) }
}
}

#[naked]
unsafe extern "C" fn context_switch(_current_task: &mut TaskContext, _next_task: &TaskContext) {
unsafe extern "C" fn context_switch(
_current_task: &mut TaskContext,
_next_task: &TaskContext,
_current_task_on_cpu_ptr: &mut bool,
) {
asm!(
"
// save old context (callee-saved registers)
Expand All @@ -118,7 +122,7 @@ unsafe extern "C" fn context_switch(_current_task: &mut TaskContext, _next_task:
ldp x27, x28, [x1, 10 * 8]
ldp x29, x30, [x1, 12 * 8]
ret",
b finish_switch",
options(noreturn),
)
}
Expand Down
12 changes: 8 additions & 4 deletions modules/axhal/src/arch/riscv/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,25 @@ impl TaskContext {
///
/// It first saves the current task's context from CPU to this place, and then
/// restores the next task's context from `next_ctx` to CPU.
pub fn switch_to(&mut self, next_ctx: &Self) {
pub fn switch_to(&mut self, next_ctx: &Self, current_task_on_cpu_ptr: &mut bool) {
#[cfg(feature = "tls")]
{
self.tp = super::read_thread_pointer();
unsafe { super::write_thread_pointer(next_ctx.tp) };
}
unsafe {
// TODO: switch FP states
context_switch(self, next_ctx)
context_switch(self, next_ctx, current_task_on_cpu_ptr)
}
}
}

#[naked]
unsafe extern "C" fn context_switch(_current_task: &mut TaskContext, _next_task: &TaskContext) {
unsafe extern "C" fn context_switch(
_current_task: &mut TaskContext,
_next_task: &TaskContext,
current_task_on_cpu_ptr: &mut bool,
) {
asm!(
"
// save old context (callee-saved registers)
Expand Down Expand Up @@ -156,7 +160,7 @@ unsafe extern "C" fn context_switch(_current_task: &mut TaskContext, _next_task:
LDR sp, a1, 1
LDR ra, a1, 0
ret",
j finish_switch",
options(noreturn),
)
}
17 changes: 11 additions & 6 deletions modules/axhal/src/arch/x86_64/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl TaskContext {
///
/// It first saves the current task's context from CPU to this place, and then
/// restores the next task's context from `next_ctx` to CPU.
pub fn switch_to(&mut self, next_ctx: &Self) {
pub fn switch_to(&mut self, next_ctx: &Self, current_task_on_cpu_ptr: &mut bool) {
#[cfg(feature = "fp_simd")]
{
self.ext_state.save();
Expand All @@ -192,12 +192,16 @@ impl TaskContext {
self.fs_base = super::read_thread_pointer();
unsafe { super::write_thread_pointer(next_ctx.fs_base) };
}
unsafe { context_switch(&mut self.rsp, &next_ctx.rsp) }
unsafe { context_switch(self, next_ctx, current_task_on_cpu_ptr) }
}
}

#[naked]
unsafe extern "C" fn context_switch(_current_stack: &mut u64, _next_stack: &u64) {
unsafe extern "C" fn context_switch(
_current_task: &mut TaskContext,
_next_task: &TaskContext,
current_task_on_cpu_ptr: &mut bool,
) {
asm!(
"
push rbp
Expand All @@ -206,16 +210,17 @@ unsafe extern "C" fn context_switch(_current_stack: &mut u64, _next_stack: &u64)
push r13
push r14
push r15
mov [rdi], rsp
mov [rdi + {task_sp_ptr}], rsp
mov rsp, [rsi]
mov rsp, [rsi + {task_sp_ptr}]
pop r15
pop r14
pop r13
pop r12
pop rbx
pop rbp
ret",
jmp finish_switch",
task_sp_ptr = const core::mem::offset_of!(TaskContext, rsp),
options(noreturn),
)
}
19 changes: 8 additions & 11 deletions modules/axtask/src/run_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,7 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {

pub fn blocked_resched(&mut self) {
let curr = crate::current();
assert!(
curr.is_blocking(),
"task is not blocking, {:?}",
curr.state()
);
assert!(curr.is_blocked(), "task is not blocked, {:?}", curr.state());

debug!("task block: {}", curr.id_name());
self.inner.resched(false);
Expand Down Expand Up @@ -323,7 +319,7 @@ impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> {
let now = axhal::time::wall_time();
if now < deadline {
crate::timers::set_alarm_wakeup(deadline, curr.clone());
curr.set_state(TaskState::Blocking);
curr.set_state(TaskState::Blocked);
self.inner.resched(false);
}
}
Expand All @@ -341,10 +337,6 @@ impl AxRunQueue {
}
}

if prev.is_blocking() {
prev.set_state(TaskState::Blocked);
}

let next = self.scheduler.pick_next_task().unwrap_or_else(|| unsafe {
// Safety: IRQs must be disabled at this time.
IDLE_TASK.current_ref_raw().get_unchecked().clone()
Expand All @@ -369,6 +361,9 @@ impl AxRunQueue {
#[cfg(feature = "preempt")]
next_task.set_preempt_pending(false);
next_task.set_state(TaskState::Running);
// Claim the task as running, we do this before switching to it
// such that any running task will have this set.
next_task.set_on_cpu(true);
if prev_task.ptr_eq(&next_task) {
return;
}
Expand All @@ -377,14 +372,16 @@ impl AxRunQueue {
let prev_ctx_ptr = prev_task.ctx_mut_ptr();
let next_ctx_ptr = next_task.ctx_mut_ptr();

let prev_on_cpu_ptr = prev_task.on_cpu_mut_ptr();

// The strong reference count of `prev_task` will be decremented by 1,
// but won't be dropped until `gc_entry()` is called.
assert!(Arc::strong_count(prev_task.as_task_ref()) > 1);
assert!(Arc::strong_count(&next_task) >= 1);

CurrentTask::set_current(prev_task, next_task);

(*prev_ctx_ptr).switch_to(&*next_ctx_ptr);
(*prev_ctx_ptr).switch_to(&*next_ctx_ptr, &mut *prev_on_cpu_ptr);
}
}
}
Expand Down
71 changes: 54 additions & 17 deletions modules/axtask/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,11 @@ pub(crate) enum TaskState {
Running = 1,
/// Task is ready to run on some scheduler's ready queue.
Ready = 2,
/// Task is just be blocked and inserted into the wait queue,
/// but still have **NOT finished** its scheduling process.
Blocking = 3,
/// Task is blocked (in the wait queue or timer list),
/// and it has finished its scheduling process, it can be wake up by `notify()` on any run queue safely.
Blocked = 4,
Blocked = 3,
/// Task is exited and waiting for being dropped.
Exited = 5,
Exited = 4,
}

/// The inner task structure.
Expand All @@ -55,6 +52,8 @@ pub struct TaskInner {
in_wait_queue: AtomicBool,
#[cfg(feature = "irq")]
in_timer_list: AtomicBool,
/// Used to indicate whether the task is running on a CPU.
on_cpu: AtomicBool,

/// Used to protect the task from being unblocked by timer and `notify()` at the same time.
/// It is used in `unblock_task()`, which is called by wait queue's `notify()` and timer's callback.
Expand Down Expand Up @@ -96,9 +95,8 @@ impl From<u8> for TaskState {
match state {
1 => Self::Running,
2 => Self::Ready,
3 => Self::Blocking,
4 => Self::Blocked,
5 => Self::Exited,
3 => Self::Blocked,
4 => Self::Exited,
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -196,6 +194,7 @@ impl TaskInner {
in_wait_queue: AtomicBool::new(false),
#[cfg(feature = "irq")]
in_timer_list: AtomicBool::new(false),
on_cpu: AtomicBool::new(false),
#[cfg(feature = "irq")]
unblock_lock: SpinRaw::new(()),
#[cfg(feature = "preempt")]
Expand Down Expand Up @@ -259,11 +258,6 @@ impl TaskInner {
matches!(self.state(), TaskState::Blocked)
}

#[inline]
pub(crate) fn is_blocking(&self) -> bool {
matches!(self.state(), TaskState::Blocking)
}

#[inline]
pub(crate) const fn is_init(&self) -> bool {
self.is_init
Expand Down Expand Up @@ -309,13 +303,21 @@ impl TaskInner {
where
F: FnMut(),
{
// When task's state is Blocking, it has not finished its scheduling process.
if self.is_blocking() {
while self.is_blocking() {
debug!("{} unblocking", self.id_name());

// If the owning (remote) CPU is still in the middle of schedule() with
// this task as prev, wait until it's done referencing the task.
//
// Pairs with the `finish_switch()`.
//
// This ensures that tasks getting woken will be fully ordered against
// their previous state and preserve Program Order.
if self.on_cpu() {
while self.on_cpu() {
// Wait for the task to finish its scheduling process.
core::hint::spin_loop();
}
assert!(self.is_blocked())
assert!(!self.on_cpu())
}

// When irq is enabled, use `unblock_lock` to protect the task from being unblocked by timer and `notify()` at the same time.
Expand Down Expand Up @@ -381,6 +383,27 @@ impl TaskInner {
self.ctx.get_mut()
}

/// Returns the raw pointer to the `on_cpu` field.
#[inline]
pub(crate) const fn on_cpu_mut_ptr(&self) -> *mut bool {
self.on_cpu.as_ptr()
}

/// Sets whether the task is running on a CPU.
pub fn set_on_cpu(&self, on_cpu: bool) {
self.on_cpu.store(on_cpu, Ordering::Release)
}

/// Returns whether the task is running on a CPU.
///
/// It is used to protect the task from being moved to a different run queue
/// while it has not finished its scheduling process.
/// The `on_cpu` field is set to `true` when the task is preparing to run on a CPU,
/// and it is set to `false` when the task has finished its scheduling process in `finish_switch`.
pub fn on_cpu(&self) -> bool {
self.on_cpu.load(Ordering::Acquire)
}

/// Returns the top address of the kernel stack.
#[inline]
pub const fn kernel_stack_top(&self) -> Option<VirtAddr> {
Expand Down Expand Up @@ -499,3 +522,17 @@ extern "C" fn task_entry() -> ! {
}
crate::exit(0);
}

#[no_mangle]
unsafe extern "C" fn finish_switch(
_prev_task: &mut TaskContext,
_next_task: &TaskContext,
_prev_task_on_cpu_ptr: *mut bool,
) {
// This must be the very last reference to @_prev_task from this CPU. After
// `on_cpu` is cleared, the task can be moved to a different CPU. We
// must ensure this doesn't happen until the switch is completely
// finished.
AtomicBool::from_ptr(_prev_task_on_cpu_ptr).store(false, Ordering::Release);
// ret to next task
}
15 changes: 3 additions & 12 deletions modules/axtask/src/wait_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,7 @@ impl WaitQueue {
#[cfg(feature = "preempt")]
assert!(curr.can_preempt(2));

// We set task state as `Blocking` to clarify that the task is blocked
// but **still NOT** finished its scheduling process.
//
// When another task (generally on another run queue) try to unblock this task,
// * if this task's state is still `Blocking`:
// it needs to wait for this task's state to be changed to `Blocked`, which means it has finished its scheduling process.
// * if this task's state is `Blocked`:
// it means this task is blocked and finished its scheduling process, in another word, it has left current run queue,
// so this task can be scheduled on any run queue.
curr.set_state(TaskState::Blocking);
curr.set_state(TaskState::Blocked);
curr.set_in_wait_queue(true);

debug!("{} push to wait queue", curr.id_name());
Expand Down Expand Up @@ -128,7 +119,7 @@ impl WaitQueue {
assert!(curr.can_preempt(2));
wq.push_back(curr.clone());

curr.set_state(TaskState::Blocking);
curr.set_state(TaskState::Blocked);
curr.set_in_wait_queue(true);
drop(wq);

Expand Down Expand Up @@ -196,7 +187,7 @@ impl WaitQueue {
assert!(curr.can_preempt(2));
wq.push_back(curr.clone());

curr.set_state(TaskState::Blocking);
curr.set_state(TaskState::Blocked);
curr.set_in_wait_queue(true);
drop(wq);

Expand Down

0 comments on commit 13fd56b

Please sign in to comment.