Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: EXC-1787: Fix scheduler AP divergence #2563

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 65 additions & 19 deletions rs/execution_environment/src/scheduler/round_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ impl RoundSchedule {
idx = (idx + 1) % self.scheduler_cores;
}

// for (i, core) in canisters_partitioned_by_cores.iter().enumerate() {
// let ids: Vec<_> = core
// .iter()
// .map(|c| c.canister_id().get().as_slice()[7])
// .collect();
// println!("XXX core {i}: {ids:?}");
// }
(canisters_partitioned_by_cores, canisters)
}

Expand Down Expand Up @@ -317,21 +324,43 @@ impl RoundSchedule {
let mut total_charged_priority = 0;
for canister_id in fully_executed_canister_ids {
if let Some(canister) = canister_states.get_mut(&canister_id) {
total_charged_priority += 100 * multiplier;
canister.scheduler_state.priority_credit += (100 * multiplier).into();
// Cap accumulated priority to -100.
if canister.scheduler_state.accumulated_priority >= (-100 * multiplier).into() {
total_charged_priority += 100 * multiplier;
// println!(
// "XXX - {} RP:{:>9} AP:{:>9} PC:{} + {}",
// canister.canister_id().get().as_slice()[7],
// canister.scheduler_state.accumulated_priority.get()
// - canister.scheduler_state.priority_credit.get(),
// canister.scheduler_state.accumulated_priority.get(),
// canister.scheduler_state.priority_credit.get(),
// 100 * multiplier
// );
canister.scheduler_state.priority_credit += (100 * multiplier).into();
}
}
}

let total_allocated = self.total_compute_allocation_percent * multiplier;
// Free capacity per canister in multiplied percent.
let free_capacity_per_canister = total_charged_priority.saturating_sub(total_allocated)
/ number_of_canisters.max(1) as i64;
// println!(
// "XXX FIN num canisters:{} total_charged_priority:{} mult:{} free/canister:{}",
// number_of_canisters, total_charged_priority, multiplier, free_capacity_per_canister
// );
// Fully divide the free allocation across all canisters.
for canister in canister_states.values_mut() {
// De-facto compute allocation includes bonus allocation
let factual = canister.scheduler_state.compute_allocation.as_percent() as i64
* multiplier
+ free_capacity_per_canister;
// println!(
// "XXX + {} AP:{:>9} + {}",
// canister.canister_id().get().as_slice()[7],
// canister.scheduler_state.accumulated_priority.get(),
// factual,
// );
// Increase accumulated priority by de-facto compute allocation.
canister.scheduler_state.accumulated_priority += factual.into();

Expand Down Expand Up @@ -417,7 +446,10 @@ impl RoundSchedule {
let mut accumulated_priority_deviation = 0;
for (&canister_id, canister) in canister_states.iter_mut() {
if is_reset_round {
canister.scheduler_state.accumulated_priority = Default::default();
// By default, each canister accumulated priority is set to its compute allocation.
canister.scheduler_state.accumulated_priority =
(canister.scheduler_state.compute_allocation.as_percent() as i64 * multiplier)
.into();
canister.scheduler_state.priority_credit = Default::default();
}

Expand Down Expand Up @@ -475,8 +507,12 @@ impl RoundSchedule {
// De-facto compute allocation includes bonus allocation
let factual =
rs.compute_allocation.as_percent() as i64 * multiplier + free_capacity_per_canister;
// Increase accumulated priority by de-facto compute allocation.
rs.accumulated_priority += factual.into();
// println!(
// "XXX +++ {} AP:{:>9} + {}",
// rs.canister_id.get().as_slice()[7],
// rs.accumulated_priority.get(),
// factual,
// );
// Count long executions and sum up their compute allocation.
if rs.has_aborted_or_paused_execution {
// Note: factual compute allocation is multiplied by `multiplier`
Expand All @@ -492,10 +528,7 @@ impl RoundSchedule {
canister.canister_id() == rs.canister_id,
"Elements in canister_states and round_states must follow in the same order",
);
// Update canister state with a new accumulated_priority.
canister.scheduler_state.accumulated_priority = rs.accumulated_priority;

// Record a canister metric.
if !canister.has_input() {
canister
.system_state
Expand All @@ -511,6 +544,14 @@ impl RoundSchedule {
// by the `multiplier`.
let long_execution_cores = ((long_executions_compute_allocation + 100 * multiplier - 1)
/ (100 * multiplier)) as usize;
// println!(
// "XXX ASS num canisters:{} capacity:{}% mult:{} free/canister:{} LEC:{}",
// number_of_canisters,
// compute_capacity_percent,
// multiplier,
// free_capacity_per_canister,
// long_execution_cores
// );
// If there are long executions, the `long_execution_cores` must be non-zero.
debug_assert_or_critical_error!(
number_of_long_executions == 0 || long_execution_cores > 0,
Expand Down Expand Up @@ -561,17 +602,18 @@ impl RoundSchedule {
let active_cores = scheduler_cores.min(number_of_canisters);
for (i, canister_id) in scheduling_order.take(active_cores).enumerate() {
let canister_state = canister_states.get_mut(canister_id).unwrap();
// As top `scheduler_cores` canisters are guaranteed to be scheduled
// this round, their accumulated priorities must be decreased here
// by `capacity * multiplier / scheduler_cores`. But instead this
// value is accumulated in the `priority_credit`, and applied later:
// * For short executions, the `priority_credit` is deducted from
// the `accumulated_priority` at the end of the round.
// * For long executions, the `priority_credit` is accumulated
// for a few rounds, and deducted from the `accumulated_priority`
// at the end of the long execution.
canister_state.scheduler_state.priority_credit +=
(compute_capacity_percent * multiplier / active_cores as i64).into();
// println!(
// "XXX --- {} AP:{:>9} PC:{} LEM:{}",
// canister_state.canister_id().get().as_slice()[7],
// canister_state.scheduler_state.accumulated_priority.get(),
// canister_state.scheduler_state.priority_credit.get(),
// // (compute_capacity_percent * multiplier / active_cores as i64),
// if i < round_schedule.long_execution_cores {
// 1
// } else {
// 0
// }
// );
if i < round_schedule.long_execution_cores {
canister_state.scheduler_state.long_execution_mode =
LongExecutionMode::Prioritized;
Expand All @@ -588,6 +630,10 @@ impl RoundSchedule {
// Aborting a long-running execution moves the canister to the
// default execution mode because the canister does not have a
// pending execution anymore.
// println!(
// "XXX apply_priority_credit accumulated_priority:{} priority_credit:{}",
// canister.scheduler_state.accumulated_priority, canister.scheduler_state.priority_credit
// );
canister.scheduler_state.long_execution_mode = LongExecutionMode::default();
}
}
Loading