Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update test_scheduler #1045

Merged
merged 6 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 14 additions & 15 deletions applications/test_scheduler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
[package]
name = "test_scheduler"
version = "0.1.0"
authors = ["Namitha Liyanage <[email protected]>"]
authors = ["Klim Tsoutsman <[email protected]>"]
description = "An application to test the scheduler"
edition = "2021"

[dependencies]
app_io = { path = "../../kernel/app_io" }
cpu = { path = "../../kernel/cpu" }
log = "0.4.8"
random = { path = "../../kernel/random" }
spawn = { path = "../../kernel/spawn" }
sync_block = { path = "../../kernel/sync_block" }
task = { path = "../../kernel/task" }

[dependencies.log]
version = "0.4.8"

[dependencies.spawn]
path = "../../kernel/spawn"

[dependencies.scheduler]
path = "../../kernel/scheduler"

[dependencies.task]
path = "../../kernel/task"

[dependencies.cpu]
path = "../../kernel/cpu"
[dependencies.rand]
version = "0.8.5"
default-features = false
features = ["small_rng"]
181 changes: 127 additions & 54 deletions applications/test_scheduler/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,151 @@
#![no_std]

#[macro_use] extern crate log;
extern crate alloc;
extern crate spawn;
extern crate scheduler;
extern crate task;
extern crate cpu;

use core::convert::TryFrom;

use alloc::string::String;
use alloc::vec::Vec;
use cpu::CpuId;
use alloc::{format, string::String, vec::Vec};
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

use app_io::println;
use cpu::{cpus, CpuId};
use rand::seq::SliceRandom;
use sync_block::RwLock;
use task::TaskRef;

pub fn main(_args: Vec<String>) -> isize {
let cpu_1 = CpuId::try_from(1).expect("CPU ID 1 did not exist");

let taskref1 = spawn::new_task_builder(worker, ())
.name(String::from("test1"))
.pin_on_cpu(cpu_1)
.spawn().expect("failed to initiate task");
println!("testing pinned");
test_pinned();
println!("testing unpinned");
test_unpinned();
0
}

if !scheduler::set_priority(&taskref1, 30) {
error!("scheduler_eval(): Could not set priority to taskref1");
// Spawn a bunch of pinned tasks, and then each pinned task randomly blocks and
// unblocks other tasks than are pinned to the same CPU.
//
// The tasks must be pinned to the same CPU to avoid a deadlock where two tasks
// on different CPUs block each other at the same time and then yield.
pub fn test_pinned() {
static TASKS: RwLock<Vec<(CpuId, Vec<TaskRef>)>> = RwLock::new(Vec::new());
static READY: AtomicBool = AtomicBool::new(false);

let tasks = cpus()
.map(|cpu| {
(
#[allow(clippy::clone_on_copy)]
cpu.clone(),
(0..100)
.map(move |id| {
spawn::new_task_builder(pinned_worker, cpu)
.name(format!("test-scheduler-pinned-{cpu}-{id}"))
.pin_on_cpu(cpu)
.block()
.spawn()
.expect("failed to spawn task")
})
.collect::<Vec<_>>(),
)
})
.collect::<Vec<_>>();

*TASKS.write() = tasks
.iter()
.map(|(cpu, task_iter)| (*cpu, task_iter.iter().map(|task| (*task).clone()).collect()))
.collect();

for (_, task_list) in tasks.iter() {
for task in task_list {
task.unblock().unwrap();
}
}

debug!("Spawned Task 1");

let taskref2 = spawn::new_task_builder(worker, ())
.name(String::from("test2"))
.pin_on_cpu(cpu_1)
.spawn().expect("failed to initiate task");
READY.store(true, Ordering::Release);

if !scheduler::set_priority(&taskref2, 20) {
error!("scheduler_eval(): Could not set priority to taskref2");
for (_, task_list) in tasks {
for task in task_list {
task.join().unwrap();
}
}

debug!("Spawned Task 2");

let taskref3 = spawn::new_task_builder(worker, ())
.name(String::from("test3"))
.pin_on_cpu(cpu_1)
.spawn().expect("failed to initiate task");

if !scheduler::set_priority(&taskref3, 10) {
error!("scheduler_eval(): Could not set priority to taskref3");
// We have to drop the tasks so that the `test-scheduler` crate can be dropped.
*TASKS.write() = Vec::new();

fn pinned_worker(pinned_cpu: CpuId) {
let mut rng = random::init_rng::<rand::rngs::SmallRng>().unwrap();
while !READY.load(Ordering::Acquire) {
core::hint::spin_loop();
}

let locked = TASKS.read();
let tasks = &locked.iter().find(|(cpu, _)| *cpu == pinned_cpu).unwrap().1;
for _ in 0..100 {
assert_eq!(
cpu::current_cpu(),
pinned_cpu,
"pinned worker migrated cores"
);

let random_task = tasks.choose(&mut rng).unwrap();

let chose_self =
task::with_current_task(|current_task| random_task == current_task).unwrap();
if chose_self {
continue;
}

let _ = random_task.block();
task::schedule();
let _ = random_task.unblock();
}
}
}

debug!("Spawned Task 3");
/// Spawn a bunch of unpinned tasks, and then block and unblock random tasks
/// from the main thread.
pub fn test_unpinned() {
const NUM_TASKS: usize = 500;

static READY: AtomicBool = AtomicBool::new(false);
static NUM_RUNNING: AtomicUsize = AtomicUsize::new(NUM_TASKS);

let tasks = (0..NUM_TASKS)
.map(move |id| {
spawn::new_task_builder(unpinned_worker, ())
.name(format!("test-scheduler-unpinned-{id}"))
.block()
.spawn()
.expect("failed to spawn task")
})
.collect::<Vec<_>>();

for task in tasks.iter() {
task.unblock().unwrap();
}

debug!("Spawned all tasks");
READY.store(true, Ordering::Release);

let _priority1 = scheduler::priority(&taskref1);
let _priority2 = scheduler::priority(&taskref2);
let _priority3 = scheduler::priority(&taskref3);
// Cause some mayhem.
let mut rng = random::init_rng::<rand::rngs::SmallRng>().unwrap();
while NUM_RUNNING.load(Ordering::Relaxed) != 0 {
let random_task = tasks.choose(&mut rng).unwrap();
let _ = random_task.block();
// Let the worker tasks on this core run.
task::schedule();
let _ = random_task.unblock();
}

#[cfg(epoch_scheduler)]
{
assert_eq!(_priority1,Some(30));
assert_eq!(_priority2,Some(20));
assert_eq!(_priority3,Some(10));
for task in tasks {
task.join().unwrap();
}

taskref1.join().expect("Task 1 join failed");
taskref2.join().expect("Task 2 join failed");
taskref3.join().expect("Task 3 join failed");
fn unpinned_worker(_: ()) {
while !READY.load(Ordering::Acquire) {
core::hint::spin_loop();
}

0
}
for _ in 0..1000 {
task::schedule();
}

fn worker(_: ()) {
for i in 0..1000 {
debug!("Task_ID : {} , Instance : {}", task::get_my_current_task_id(), i);
scheduler::schedule();
NUM_RUNNING.fetch_sub(1, Ordering::Relaxed);
}
}
5 changes: 5 additions & 0 deletions kernel/cpu/src/aarch64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ pub fn register_cpu(bootstrap: bool) -> Result<(), &'static str> {
}
}

// Returns an iterator over the available CPUs.
pub fn cpus() -> impl Iterator<Item = CpuId> {
ONLINE_CPUS.read().clone().into_iter()
}

/// Returns the number of CPUs (SMP cores) that exist and
/// are currently initialized on this system.
pub fn cpu_count() -> u32 {
Expand Down
4 changes: 4 additions & 0 deletions kernel/cpu/src/x86_64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl TryFrom<u32> for CpuId {
}
}

// Returns an iterator over the available CPUs.
pub fn cpus() -> impl Iterator<Item = CpuId> {
apic::get_lapics().iter().map(|(apic_id, _)| (*apic_id).into())
}

/// Returns the number of CPUs (SMP cores) that exist and
/// are currently initialized on this system.
Expand Down
4 changes: 2 additions & 2 deletions kernel/task_struct/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ impl Task {
if self.runstate.compare_exchange(Runnable, Blocked).is_ok() {
Ok(Runnable)
} else if self.runstate.compare_exchange(Blocked, Blocked).is_ok() {
warn!("Blocked an already blocked task: {:?}", self);
// warn!("Blocked an already blocked task: {:?}", self);
Ok(Blocked)
} else {
Err(self.runstate.load())
Expand Down Expand Up @@ -510,7 +510,7 @@ impl Task {
if self.runstate.compare_exchange(Blocked, Runnable).is_ok() {
Ok(Blocked)
} else if self.runstate.compare_exchange(Runnable, Runnable).is_ok() {
warn!("Unblocked an already runnable task: {:?}", self);
// warn!("Unblocked an already runnable task: {:?}", self);
Ok(Runnable)
} else {
Err(self.runstate.load())
Expand Down