From 4457883f90809bb9b9584369c18fdc31c6564146 Mon Sep 17 00:00:00 2001 From: Andriy Berestovskyy Date: Tue, 12 Nov 2024 09:05:07 +0100 Subject: [PATCH 1/4] fix: EXC-1787: Fix scheduler AP divergence --- .../src/scheduler/round_schedule.rs | 84 +++++++-- .../src/scheduler/tests.rs | 174 +++++++++++++++++- 2 files changed, 236 insertions(+), 22 deletions(-) diff --git a/rs/execution_environment/src/scheduler/round_schedule.rs b/rs/execution_environment/src/scheduler/round_schedule.rs index ac658f4dd34..55ca5dd7202 100644 --- a/rs/execution_environment/src/scheduler/round_schedule.rs +++ b/rs/execution_environment/src/scheduler/round_schedule.rs @@ -274,6 +274,13 @@ impl RoundSchedule { idx = (idx + 1) % self.scheduler_cores; } + // for (i, core) in canisters_partitioned_by_cores.iter().enumerate() { + // let ids: Vec<_> = core + // .iter() + // .map(|c| c.canister_id().get().as_slice()[7]) + // .collect(); + // println!("XXX core {i}: {ids:?}"); + // } (canisters_partitioned_by_cores, canisters) } @@ -317,8 +324,20 @@ impl RoundSchedule { let mut total_charged_priority = 0; for canister_id in fully_executed_canister_ids { if let Some(canister) = canister_states.get_mut(&canister_id) { - total_charged_priority += 100 * multiplier; - canister.scheduler_state.priority_credit += (100 * multiplier).into(); + // Cap accumulated priority to -100. + if canister.scheduler_state.accumulated_priority >= (-100 * multiplier).into() { + total_charged_priority += 100 * multiplier; + // println!( + // "XXX - {} RP:{:>9} AP:{:>9} PC:{} + {}", + // canister.canister_id().get().as_slice()[7], + // canister.scheduler_state.accumulated_priority.get() + // - canister.scheduler_state.priority_credit.get(), + // canister.scheduler_state.accumulated_priority.get(), + // canister.scheduler_state.priority_credit.get(), + // 100 * multiplier + // ); + canister.scheduler_state.priority_credit += (100 * multiplier).into(); + } } } @@ -326,12 +345,22 @@ impl RoundSchedule { // Free capacity per canister in multiplied percent. let free_capacity_per_canister = total_charged_priority.saturating_sub(total_allocated) / number_of_canisters.max(1) as i64; + // println!( + // "XXX FIN num canisters:{} total_charged_priority:{} mult:{} free/canister:{}", + // number_of_canisters, total_charged_priority, multiplier, free_capacity_per_canister + // ); // Fully divide the free allocation across all canisters. for canister in canister_states.values_mut() { // De-facto compute allocation includes bonus allocation let factual = canister.scheduler_state.compute_allocation.as_percent() as i64 * multiplier + free_capacity_per_canister; + // println!( + // "XXX + {} AP:{:>9} + {}", + // canister.canister_id().get().as_slice()[7], + // canister.scheduler_state.accumulated_priority.get(), + // factual, + // ); // Increase accumulated priority by de-facto compute allocation. canister.scheduler_state.accumulated_priority += factual.into(); @@ -417,7 +446,10 @@ impl RoundSchedule { let mut accumulated_priority_deviation = 0; for (&canister_id, canister) in canister_states.iter_mut() { if is_reset_round { - canister.scheduler_state.accumulated_priority = Default::default(); + // By default, each canister accumulated priority is set to its compute allocation. + canister.scheduler_state.accumulated_priority = + (canister.scheduler_state.compute_allocation.as_percent() as i64 * multiplier) + .into(); canister.scheduler_state.priority_credit = Default::default(); } @@ -475,8 +507,12 @@ impl RoundSchedule { // De-facto compute allocation includes bonus allocation let factual = rs.compute_allocation.as_percent() as i64 * multiplier + free_capacity_per_canister; - // Increase accumulated priority by de-facto compute allocation. - rs.accumulated_priority += factual.into(); + // println!( + // "XXX +++ {} AP:{:>9} + {}", + // rs.canister_id.get().as_slice()[7], + // rs.accumulated_priority.get(), + // factual, + // ); // Count long executions and sum up their compute allocation. if rs.has_aborted_or_paused_execution { // Note: factual compute allocation is multiplied by `multiplier` @@ -492,10 +528,7 @@ impl RoundSchedule { canister.canister_id() == rs.canister_id, "Elements in canister_states and round_states must follow in the same order", ); - // Update canister state with a new accumulated_priority. - canister.scheduler_state.accumulated_priority = rs.accumulated_priority; - // Record a canister metric. if !canister.has_input() { canister .system_state @@ -511,6 +544,14 @@ impl RoundSchedule { // by the `multiplier`. let long_execution_cores = ((long_executions_compute_allocation + 100 * multiplier - 1) / (100 * multiplier)) as usize; + // println!( + // "XXX ASS num canisters:{} capacity:{}% mult:{} free/canister:{} LEC:{}", + // number_of_canisters, + // compute_capacity_percent, + // multiplier, + // free_capacity_per_canister, + // long_execution_cores + // ); // If there are long executions, the `long_execution_cores` must be non-zero. debug_assert_or_critical_error!( number_of_long_executions == 0 || long_execution_cores > 0, @@ -561,17 +602,18 @@ impl RoundSchedule { let active_cores = scheduler_cores.min(number_of_canisters); for (i, canister_id) in scheduling_order.take(active_cores).enumerate() { let canister_state = canister_states.get_mut(canister_id).unwrap(); - // As top `scheduler_cores` canisters are guaranteed to be scheduled - // this round, their accumulated priorities must be decreased here - // by `capacity * multiplier / scheduler_cores`. But instead this - // value is accumulated in the `priority_credit`, and applied later: - // * For short executions, the `priority_credit` is deducted from - // the `accumulated_priority` at the end of the round. - // * For long executions, the `priority_credit` is accumulated - // for a few rounds, and deducted from the `accumulated_priority` - // at the end of the long execution. - canister_state.scheduler_state.priority_credit += - (compute_capacity_percent * multiplier / active_cores as i64).into(); + // println!( + // "XXX --- {} AP:{:>9} PC:{} LEM:{}", + // canister_state.canister_id().get().as_slice()[7], + // canister_state.scheduler_state.accumulated_priority.get(), + // canister_state.scheduler_state.priority_credit.get(), + // // (compute_capacity_percent * multiplier / active_cores as i64), + // if i < round_schedule.long_execution_cores { + // 1 + // } else { + // 0 + // } + // ); if i < round_schedule.long_execution_cores { canister_state.scheduler_state.long_execution_mode = LongExecutionMode::Prioritized; @@ -588,6 +630,10 @@ impl RoundSchedule { // Aborting a long-running execution moves the canister to the // default execution mode because the canister does not have a // pending execution anymore. + // println!( + // "XXX apply_priority_credit accumulated_priority:{} priority_credit:{}", + // canister.scheduler_state.accumulated_priority, canister.scheduler_state.priority_credit + // ); canister.scheduler_state.long_execution_mode = LongExecutionMode::default(); } } diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index bed03d40a8e..50be574039a 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -1786,6 +1786,173 @@ fn max_canisters_per_round() { assert_eq!(executed_canisters, 200 + 2 * 5); } +fn run_scheduler_test( + rounds: usize, + scheduler_cores: usize, + canisters_with_no_cycles: usize, + active_canisters_with_no_cycles: usize, + canisters_with_cycles: usize, + active_canisters_with_cycles: usize, + canisters_with_long_executions: usize, + active_canisters_with_long_executions: usize, + short_execution_instructions: u64, + long_execution_instructions: u64, +) -> SchedulerTest { + let multiplier = scheduler_cores + * (canisters_with_no_cycles + canisters_with_cycles + canisters_with_long_executions); + + let mut test = SchedulerTestBuilder::new() + .with_scheduler_config(SchedulerConfig { + scheduler_cores, + ..SchedulerConfig::application_subnet() + }) + .build(); + + // Bump up the round number to 1. + test.execute_round(ExecutionRoundType::OrdinaryRound); + + let mut no_cycles_ids = vec![]; + for _ in 0..canisters_with_no_cycles { + let canister_id = test.create_canister_with( + Cycles::new(0), + ComputeAllocation::zero(), + MemoryAllocation::BestEffort, + None, + None, + None, + ); + no_cycles_ids.push(canister_id); + } + let mut cycles_ids = vec![]; + for _ in 0..canisters_with_cycles { + let canister_id = test.create_canister(); + cycles_ids.push(canister_id); + } + let mut long_ids = vec![]; + for _ in 0..canisters_with_long_executions { + let canister_id = test.create_canister(); + long_ids.push(canister_id); + } + + let mut max_ap = i64::MIN; + let mut min_ap = i64::MAX; + let mut inv = 0; + for r in 1..=rounds { + test.execute_round(ExecutionRoundType::OrdinaryRound); + + for id in no_cycles_ids.iter().take(active_canisters_with_no_cycles) { + test.send_ingress(*id, ingress(short_execution_instructions)); + } + for id in cycles_ids.iter().take(active_canisters_with_cycles) { + test.send_ingress(*id, ingress(short_execution_instructions)); + } + + for id in long_ids.iter().take(active_canisters_with_long_executions) { + test.send_ingress(*id, ingress(long_execution_instructions)); + } + + for canister in test.state().canisters_iter() { + min_ap = min_ap.min(canister.scheduler_state.accumulated_priority.get()); + max_ap = max_ap.max(canister.scheduler_state.accumulated_priority.get()); + inv += canister.scheduler_state.accumulated_priority.get() + - canister.scheduler_state.priority_credit.get(); + } + println!("XXX Round:{r} inv:{inv} min_ap:{min_ap}..max_ap:{max_ap}"); + for canister in test.state().canisters_iter() { + println!( + "XXX END {} RP:{:>9} AP:{:>9} PC:{:>6} ex:{:>6} SF:{:>4} LE:{} LEM:{:?} LFER:{:>4}", + canister.canister_id().get().as_slice()[7], + canister.scheduler_state.accumulated_priority.get() + - canister.scheduler_state.priority_credit.get(), + canister.scheduler_state.accumulated_priority.get(), + canister.scheduler_state.priority_credit.get(), + canister.system_state.canister_metrics.executed, + canister.system_state.canister_metrics.scheduled_as_first, + (canister.has_paused_execution() || canister.has_aborted_execution()) as i32, + canister.scheduler_state.long_execution_mode as i32, + canister.scheduler_state.last_full_execution_round + ); + } + assert_eq!(inv, 0); + // Allow up to 40x divergence for long executions (should be in fact 20x, as 40B/2B = 20). + assert!( + min_ap > -100 * 40 * multiplier as i64, + "Error checking min accumulated priority {} > {} (-100% * 40x * multiplier:{multiplier})", + min_ap, + -100 * 40 * multiplier as i64 + ); + assert!( + max_ap < 100 * 40 * multiplier as i64, + "Error checking max accumulated priority {} < {} (100% * 40x * multiplier:{multiplier})", + max_ap, + 100 * 40 * multiplier as i64 + ); + } + test +} + +#[test] +fn scheduler_accumulated_priority_divergence_many_short_executions() { + let scheduler_cores = 4; + + let canisters_with_no_cycles = 0; + let active_canisters_with_no_cycles = 0; + let canisters_with_cycles = scheduler_cores; + let active_canisters_with_cycles = scheduler_cores; + let short_execution_instructions = 13_000_000; + let canisters_with_long_executions = 1; + let active_canisters_with_long_executions = 1; + let long_execution_instructions = 40_000_000_000; + + let multiplier = scheduler_cores + * (canisters_with_no_cycles + canisters_with_cycles + canisters_with_long_executions); + let rounds = multiplier * 100; + + let _test = run_scheduler_test( + rounds, + scheduler_cores, + canisters_with_no_cycles, + active_canisters_with_no_cycles, + canisters_with_cycles, + active_canisters_with_cycles, + canisters_with_long_executions, + active_canisters_with_long_executions, + short_execution_instructions, + long_execution_instructions, + ); +} + +#[test] +fn scheduler_accumulated_priority_divergence_many_long_executions() { + let scheduler_cores = 4; + + let canisters_with_no_cycles = 0; + let active_canisters_with_no_cycles = 0; + let canisters_with_cycles = 1; + let active_canisters_with_cycles = 1; + let short_execution_instructions = 13_000_000; + let canisters_with_long_executions = scheduler_cores + 1; + let active_canisters_with_long_executions = scheduler_cores + 1; + let long_execution_instructions = 40_000_000_000; + + let multiplier = scheduler_cores + * (canisters_with_no_cycles + canisters_with_cycles + canisters_with_long_executions); + let rounds = multiplier * 100; + + let _test = run_scheduler_test( + rounds, + scheduler_cores, + canisters_with_no_cycles, + active_canisters_with_no_cycles, + canisters_with_cycles, + active_canisters_with_cycles, + canisters_with_long_executions, + active_canisters_with_long_executions, + short_execution_instructions, + long_execution_instructions, + ); +} + #[test] fn can_fully_execute_canisters_deterministically_until_out_of_cycles() { // In this test we have 5 canisters with 10 input messages each. The maximum @@ -4380,7 +4547,7 @@ fn scheduler_respects_compute_allocation( let replicated_state = test.state(); let number_of_canisters = replicated_state.canister_states.len(); let total_compute_allocation = replicated_state.total_compute_allocation(); - assert!(total_compute_allocation <= 100 * scheduler_cores as u64); + prop_assert!(total_compute_allocation <= 100 * scheduler_cores as u64); // Count, for each canister, how many times it is the first canister // to be executed by a thread. @@ -4393,7 +4560,8 @@ fn scheduler_respects_compute_allocation( let canister_ids: Vec<_> = test.state().canister_states.iter().map(|x| *x.0).collect(); - for _ in 0..number_of_rounds { + // Add one more round as we update the accumulated priorities at the end of the round now. + for _ in 0..=number_of_rounds { for canister_id in canister_ids.iter() { test.expect_heartbeat(*canister_id, instructions(B as u64)); } @@ -4421,7 +4589,7 @@ fn scheduler_respects_compute_allocation( number_of_rounds / 100 * compute_allocation + 1 }; - assert!( + prop_assert!( *count >= expected_count, "Canister {} (allocation {}) should have been scheduled \ {} out of {} rounds, was scheduled only {} rounds instead.", From ee46cc9c039c039bd79ecced8142e20d5a32d4d8 Mon Sep 17 00:00:00 2001 From: Andriy Berestovskyy Date: Tue, 12 Nov 2024 16:37:41 +0100 Subject: [PATCH 2/4] Reproduce scheduler unfairness for short executions --- .../src/scheduler/tests.rs | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index 50be574039a..3d385cf0126 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -1953,6 +1953,98 @@ fn scheduler_accumulated_priority_divergence_many_long_executions() { ); } +// To run: +// bazel test //rs/execution_environment:execution_environment_test --test_arg=--nocapture --test_arg=--include-ignored --test_output=streamed --test_arg=scheduler_unfairness +#[ignore] +#[test] +fn scheduler_unfairness() { + let scheduler_cores = 4; + + let interactive_canisters = scheduler_cores; + let active_interactive_canisters = scheduler_cores; + let batch_canisters = scheduler_cores * 2; + let active_batch_canisters = scheduler_cores * 2; + let batch_executions = 200; + let short_execution_instructions = 10_000_000; + + let rounds = 10; + + let mut test = SchedulerTestBuilder::new() + .with_scheduler_config(SchedulerConfig { + scheduler_cores, + ..SchedulerConfig::application_subnet() + }) + .build(); + + // Bump up the round number to 1. + test.execute_round(ExecutionRoundType::OrdinaryRound); + + let mut interactive_ids = vec![]; + for _ in 0..interactive_canisters { + let canister_id = test.create_canister(); + interactive_ids.push(canister_id); + } + let mut batch_ids = vec![]; + for _ in 0..batch_canisters { + let canister_id = test.create_canister(); + batch_ids.push(canister_id); + } + + let mut inv = 0; + for r in 1..=rounds { + test.execute_round(ExecutionRoundType::OrdinaryRound); + + for id in interactive_ids.iter().take(active_interactive_canisters) { + test.send_ingress(*id, ingress(short_execution_instructions)); + } + for id in batch_ids.iter().take(active_batch_canisters) { + for _ in 0..batch_executions { + test.send_ingress(*id, ingress(short_execution_instructions)); + } + } + + for canister in test.state().canisters_iter() { + inv += canister.scheduler_state.accumulated_priority.get() + - canister.scheduler_state.priority_credit.get(); + } + println!("XXX Round:{r} inv:{inv}"); + for canister in test.state().canisters_iter() { + println!( + "XXX END {} RP:{:>9} AP:{:>9} PC:{:>6} ex:{:>6} SF:{:>4} LE:{} LEM:{:?} LFER:{:>4}", + canister.canister_id().get().as_slice()[7], + canister.scheduler_state.accumulated_priority.get() + - canister.scheduler_state.priority_credit.get(), + canister.scheduler_state.accumulated_priority.get(), + canister.scheduler_state.priority_credit.get(), + canister.system_state.canister_metrics.executed, + canister.system_state.canister_metrics.scheduled_as_first, + (canister.has_paused_execution() || canister.has_aborted_execution()) as i32, + canister.scheduler_state.long_execution_mode as i32, + canister.scheduler_state.last_full_execution_round + ); + } + assert_eq!(inv, 0); + } + batch_ids + .iter() + .map(|canister_id| { + test.canister_state(*canister_id) + .system_state + .canister_metrics + .executed + }) + .reduce(|prev_executed, next_executed| { + assert!( + prev_executed < next_executed * 2, + "Error checking fairness for batch canisters: {} < {} *2", + prev_executed, + next_executed, + ); + next_executed + }) + .unwrap(); +} + #[test] fn can_fully_execute_canisters_deterministically_until_out_of_cycles() { // In this test we have 5 canisters with 10 input messages each. The maximum From d880b8efc56233f45ea2846c586b160ce6495c1e Mon Sep 17 00:00:00 2001 From: Andriy Berestovskyy Date: Tue, 12 Nov 2024 17:49:58 +0100 Subject: [PATCH 3/4] Try out Alin's suggestion --- .../src/scheduler/round_schedule.rs | 47 +++++++++---------- .../src/scheduler/tests.rs | 8 ++-- 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/rs/execution_environment/src/scheduler/round_schedule.rs b/rs/execution_environment/src/scheduler/round_schedule.rs index 55ca5dd7202..3a1e1141a07 100644 --- a/rs/execution_environment/src/scheduler/round_schedule.rs +++ b/rs/execution_environment/src/scheduler/round_schedule.rs @@ -324,20 +324,17 @@ impl RoundSchedule { let mut total_charged_priority = 0; for canister_id in fully_executed_canister_ids { if let Some(canister) = canister_states.get_mut(&canister_id) { - // Cap accumulated priority to -100. - if canister.scheduler_state.accumulated_priority >= (-100 * multiplier).into() { - total_charged_priority += 100 * multiplier; - // println!( - // "XXX - {} RP:{:>9} AP:{:>9} PC:{} + {}", - // canister.canister_id().get().as_slice()[7], - // canister.scheduler_state.accumulated_priority.get() - // - canister.scheduler_state.priority_credit.get(), - // canister.scheduler_state.accumulated_priority.get(), - // canister.scheduler_state.priority_credit.get(), - // 100 * multiplier - // ); - canister.scheduler_state.priority_credit += (100 * multiplier).into(); - } + total_charged_priority += 100 * multiplier; + // println!( + // "XXX - {} RP:{:>9} AP:{:>9} PC:{} + {}", + // canister.canister_id().get().as_slice()[7], + // canister.scheduler_state.accumulated_priority.get() + // - canister.scheduler_state.priority_credit.get(), + // canister.scheduler_state.accumulated_priority.get(), + // canister.scheduler_state.priority_credit.get(), + // 100 * multiplier + // ); + canister.scheduler_state.priority_credit += (100 * multiplier).into(); } } @@ -453,17 +450,19 @@ impl RoundSchedule { canister.scheduler_state.priority_credit = Default::default(); } + let accumulated_priority = canister.scheduler_state.accumulated_priority; + let good_standing = accumulated_priority >= 0.into(); let has_aborted_or_paused_execution = canister.has_aborted_execution() || canister.has_paused_execution(); let compute_allocation = canister.scheduler_state.compute_allocation; - let accumulated_priority = canister.scheduler_state.accumulated_priority; round_states.push(CanisterRoundState { canister_id, accumulated_priority, compute_allocation, long_execution_mode: canister.scheduler_state.long_execution_mode, - has_aborted_or_paused_execution, + // Treat long execution just like a best effort new execution. + has_aborted_or_paused_execution: good_standing && has_aborted_or_paused_execution, }); total_compute_allocation_percent += compute_allocation.as_percent() as i64; @@ -544,14 +543,14 @@ impl RoundSchedule { // by the `multiplier`. let long_execution_cores = ((long_executions_compute_allocation + 100 * multiplier - 1) / (100 * multiplier)) as usize; - // println!( - // "XXX ASS num canisters:{} capacity:{}% mult:{} free/canister:{} LEC:{}", - // number_of_canisters, - // compute_capacity_percent, - // multiplier, - // free_capacity_per_canister, - // long_execution_cores - // ); + println!( + "XXX ASS num canisters:{} capacity:{}% mult:{} free/canister:{} LEC:{}", + number_of_canisters, + compute_capacity_percent, + multiplier, + free_capacity_per_canister, + long_execution_cores + ); // If there are long executions, the `long_execution_cores` must be non-zero. debug_assert_or_critical_error!( number_of_long_executions == 0 || long_execution_cores > 0, diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index 3d385cf0126..adbb9e31554 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -1876,16 +1876,16 @@ fn run_scheduler_test( assert_eq!(inv, 0); // Allow up to 40x divergence for long executions (should be in fact 20x, as 40B/2B = 20). assert!( - min_ap > -100 * 40 * multiplier as i64, + min_ap > -100 * 40 * (active_canisters_with_long_executions * multiplier) as i64, "Error checking min accumulated priority {} > {} (-100% * 40x * multiplier:{multiplier})", min_ap, - -100 * 40 * multiplier as i64 + -100 * 40 * (active_canisters_with_long_executions * multiplier) as i64 ); assert!( - max_ap < 100 * 40 * multiplier as i64, + max_ap < 100 * 40 * (active_canisters_with_long_executions * multiplier) as i64, "Error checking max accumulated priority {} < {} (100% * 40x * multiplier:{multiplier})", max_ap, - 100 * 40 * multiplier as i64 + 100 * 40 * (active_canisters_with_long_executions * multiplier) as i64 ); } test From ed66eeecbcaa0d70f232c696619e430e73cfbccf Mon Sep 17 00:00:00 2001 From: Andriy Berestovskyy Date: Wed, 13 Nov 2024 07:30:32 +0100 Subject: [PATCH 4/4] Revert "Try out Alin's suggestion" This reverts commit d880b8efc56233f45ea2846c586b160ce6495c1e. --- .../src/scheduler/round_schedule.rs | 47 ++++++++++--------- .../src/scheduler/tests.rs | 8 ++-- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/rs/execution_environment/src/scheduler/round_schedule.rs b/rs/execution_environment/src/scheduler/round_schedule.rs index 3a1e1141a07..55ca5dd7202 100644 --- a/rs/execution_environment/src/scheduler/round_schedule.rs +++ b/rs/execution_environment/src/scheduler/round_schedule.rs @@ -324,17 +324,20 @@ impl RoundSchedule { let mut total_charged_priority = 0; for canister_id in fully_executed_canister_ids { if let Some(canister) = canister_states.get_mut(&canister_id) { - total_charged_priority += 100 * multiplier; - // println!( - // "XXX - {} RP:{:>9} AP:{:>9} PC:{} + {}", - // canister.canister_id().get().as_slice()[7], - // canister.scheduler_state.accumulated_priority.get() - // - canister.scheduler_state.priority_credit.get(), - // canister.scheduler_state.accumulated_priority.get(), - // canister.scheduler_state.priority_credit.get(), - // 100 * multiplier - // ); - canister.scheduler_state.priority_credit += (100 * multiplier).into(); + // Cap accumulated priority to -100. + if canister.scheduler_state.accumulated_priority >= (-100 * multiplier).into() { + total_charged_priority += 100 * multiplier; + // println!( + // "XXX - {} RP:{:>9} AP:{:>9} PC:{} + {}", + // canister.canister_id().get().as_slice()[7], + // canister.scheduler_state.accumulated_priority.get() + // - canister.scheduler_state.priority_credit.get(), + // canister.scheduler_state.accumulated_priority.get(), + // canister.scheduler_state.priority_credit.get(), + // 100 * multiplier + // ); + canister.scheduler_state.priority_credit += (100 * multiplier).into(); + } } } @@ -450,19 +453,17 @@ impl RoundSchedule { canister.scheduler_state.priority_credit = Default::default(); } - let accumulated_priority = canister.scheduler_state.accumulated_priority; - let good_standing = accumulated_priority >= 0.into(); let has_aborted_or_paused_execution = canister.has_aborted_execution() || canister.has_paused_execution(); let compute_allocation = canister.scheduler_state.compute_allocation; + let accumulated_priority = canister.scheduler_state.accumulated_priority; round_states.push(CanisterRoundState { canister_id, accumulated_priority, compute_allocation, long_execution_mode: canister.scheduler_state.long_execution_mode, - // Treat long execution just like a best effort new execution. - has_aborted_or_paused_execution: good_standing && has_aborted_or_paused_execution, + has_aborted_or_paused_execution, }); total_compute_allocation_percent += compute_allocation.as_percent() as i64; @@ -543,14 +544,14 @@ impl RoundSchedule { // by the `multiplier`. let long_execution_cores = ((long_executions_compute_allocation + 100 * multiplier - 1) / (100 * multiplier)) as usize; - println!( - "XXX ASS num canisters:{} capacity:{}% mult:{} free/canister:{} LEC:{}", - number_of_canisters, - compute_capacity_percent, - multiplier, - free_capacity_per_canister, - long_execution_cores - ); + // println!( + // "XXX ASS num canisters:{} capacity:{}% mult:{} free/canister:{} LEC:{}", + // number_of_canisters, + // compute_capacity_percent, + // multiplier, + // free_capacity_per_canister, + // long_execution_cores + // ); // If there are long executions, the `long_execution_cores` must be non-zero. debug_assert_or_critical_error!( number_of_long_executions == 0 || long_execution_cores > 0, diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index adbb9e31554..3d385cf0126 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -1876,16 +1876,16 @@ fn run_scheduler_test( assert_eq!(inv, 0); // Allow up to 40x divergence for long executions (should be in fact 20x, as 40B/2B = 20). assert!( - min_ap > -100 * 40 * (active_canisters_with_long_executions * multiplier) as i64, + min_ap > -100 * 40 * multiplier as i64, "Error checking min accumulated priority {} > {} (-100% * 40x * multiplier:{multiplier})", min_ap, - -100 * 40 * (active_canisters_with_long_executions * multiplier) as i64 + -100 * 40 * multiplier as i64 ); assert!( - max_ap < 100 * 40 * (active_canisters_with_long_executions * multiplier) as i64, + max_ap < 100 * 40 * multiplier as i64, "Error checking max accumulated priority {} < {} (100% * 40x * multiplier:{multiplier})", max_ap, - 100 * 40 * (active_canisters_with_long_executions * multiplier) as i64 + 100 * 40 * multiplier as i64 ); } test