From ca7b5d1d7ca5fce4b0c9f5ac4e790cd7d9fa8b2c Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 2 Jan 2025 17:21:11 +0100 Subject: [PATCH 1/9] improve performance test case --- .../turbo-tasks-testing/tests/performance.rs | 220 ++++++++++++------ 1 file changed, 149 insertions(+), 71 deletions(-) diff --git a/turbopack/crates/turbo-tasks-testing/tests/performance.rs b/turbopack/crates/turbo-tasks-testing/tests/performance.rs index 1392bd154f574..f8e0a491e9764 100644 --- a/turbopack/crates/turbo-tasks-testing/tests/performance.rs +++ b/turbopack/crates/turbo-tasks-testing/tests/performance.rs @@ -2,9 +2,9 @@ #![feature(arbitrary_self_types_pointers)] #![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this -use std::time::Duration; +use std::{future::Future, time::Duration}; -use turbo_tasks::Vc; +use turbo_tasks::{TransientInstance, Vc}; use turbo_tasks_testing::{register, run, Registration}; static REGISTRATION: Registration = register!(); @@ -12,77 +12,152 @@ static REGISTRATION: Registration = register!(); const COUNT1: u32 = 100; const COUNT2: u32 = 2000; +async fn run_test( + f: impl Fn() -> F + Sync + Send + Copy + 'static, + limit: Duration, +) -> anyhow::Result<()> +where + F: Future> + Sync + Send + 'static, +{ + // The first call will actually execute many_children and its children. + let start = std::time::Instant::now(); + f().await?; + println!("Initial call took {:?}", start.elapsed()); + + // The second call will connect to the cached many_children, but it would be ok if that's + // not yet optimized. + let start = std::time::Instant::now(); + f().await?; + println!("Second call took {:?}", start.elapsed()); + + // Susbsequent calls should be very fast. + let start = std::time::Instant::now(); + for _ in 0..COUNT1 { + f().await?; + } + let subsequent = start.elapsed(); + println!( + "First {} subsequent calls took {:?} ({:?} per call)", + COUNT1, + subsequent, + subsequent / COUNT1 + ); + + let start = std::time::Instant::now(); + for _ in 0..COUNT1 { + f().await?; + } + let subsequent2 = start.elapsed(); + println!( + "Another {} subsequent calls took {:?} ({:?} per call)", + COUNT1, + subsequent2, + subsequent2 / COUNT1 + ); + + let start = std::time::Instant::now(); + for _ in 0..COUNT1 { + f().await?; + } + let subsequent3 = start.elapsed(); + println!( + "Another {} subsequent calls took {:?} ({:?} per call)", + COUNT1, + subsequent3, + subsequent3 / COUNT1 + ); + + if subsequent2 * 2 > subsequent * 3 || subsequent3 * 2 > subsequent * 3 { + // Performance regresses with more calls + // Check if this fixes itself eventually + for i in 0.. { + let start = std::time::Instant::now(); + for _ in 0..COUNT1 { + f().await?; + } + let subsequent4 = start.elapsed(); + println!( + "Another {} subsequent calls took {:?} ({:?} per call)", + COUNT1, + subsequent4, + subsequent4 / COUNT1 + ); + if subsequent4 * 2 < subsequent * 3 { + break; + } + if i >= 20 { + panic!("Performance regressed with more calls"); + } + } + } + + let start = std::time::Instant::now(); + f().await?; + let final_call = start.elapsed(); + println!("Final call took {:?}", final_call); + + assert!( + subsequent < limit * COUNT1, + "Each call should be less than {:?}", + limit + ); + + assert!( + subsequent2 < limit * COUNT1, + "Each call should be less than {:?}", + limit + ); + + assert!( + subsequent3 < limit * COUNT1, + "Each call should be less than {:?}", + limit + ); + + anyhow::Ok(()) +} + #[tokio::test] async fn many_calls_to_many_children() { - if matches!( - std::env::var("TURBOPACK_TEST_PERFORMANCE").ok().as_deref(), - None | Some("") | Some("no") | Some("false") - ) { - println!("Skipping test, pass `TURBOPACK_TEST_PERFORMANCE=yes` to run it"); - return; - } + // if matches!( + // std::env::var("TURBOPACK_TEST_PERFORMANCE").ok().as_deref(), + // None | Some("") | Some("no") | Some("false") + // ) { + // println!("Skipping test, pass `TURBOPACK_TEST_PERFORMANCE=yes` to run it"); + // return; + // } + + run(®ISTRATION, || { + run_test( + || calls_many_children(TransientInstance::new(()), None).strongly_consistent(), + Duration::from_micros(100), + ) + }) + .await + .unwrap(); +} - run(®ISTRATION, || async { - // The first call will actually execute many_children and its children. - let start = std::time::Instant::now(); - calls_many_children(0).strongly_consistent().await?; - println!("Initial call took {:?}", start.elapsed()); - - // The second call will connect to the cached many_children, but it would be ok if that's - // not yet optimized. - let start = std::time::Instant::now(); - calls_many_children(1).strongly_consistent().await?; - println!("Second call took {:?}", start.elapsed()); - - // Susbsequent calls should be very fast. - let start = std::time::Instant::now(); - for i in 2..COUNT1 { - calls_many_children(i).strongly_consistent().await?; - } - let subsequent = start.elapsed(); - println!( - "First {} subsequent calls took {:?}", - COUNT1 - 2, - subsequent - ); - - let start = std::time::Instant::now(); - for i in COUNT1..COUNT1 * 2 - 2 { - calls_many_children(i).strongly_consistent().await?; - } - let subsequent2 = start.elapsed(); - println!( - "Another {} subsequent calls took {:?}", - COUNT1 - 2, - subsequent2 - ); - - let start = std::time::Instant::now(); - calls_many_children(COUNT1 - 1) - .strongly_consistent() - .await?; - let final_call = start.elapsed(); - println!("Final call took {:?}", final_call); - - assert!( - subsequent2 * 2 < subsequent * 3, - "Performance should not regress with more calls" - ); - - assert!( - subsequent < Duration::from_micros(100) * (COUNT1 - 2), - "Each call should be less than 100µs" - ); - - assert!( - subsequent2 < Duration::from_micros(100) * (COUNT1 - 2), - "Each call should be less than 100µs" - ); - - anyhow::Ok(()) +#[tokio::test] +async fn many_calls_to_uncached_many_children() { + // if matches!( + // std::env::var("TURBOPACK_TEST_PERFORMANCE").ok().as_deref(), + // None | Some("") | Some("no") | Some("false") + // ) { + // println!("Skipping test, pass `TURBOPACK_TEST_PERFORMANCE=yes` to run it"); + // return; + // } + + run(®ISTRATION, || { + run_test( + || { + calls_many_children(TransientInstance::new(()), Some(TransientInstance::new(()))) + .strongly_consistent() + }, + Duration::from_micros(100) * COUNT2, + ) }) .await - .unwrap() + .unwrap(); } #[turbo_tasks::value] @@ -91,13 +166,16 @@ struct Value { } #[turbo_tasks::function] -async fn calls_many_children(_i: u32) -> Vc<()> { - let _ = many_children(); +async fn calls_many_children( + _i: TransientInstance<()>, + j: Option>, +) -> Vc<()> { + let _ = many_children(j); Vc::cell(()) } #[turbo_tasks::function] -fn many_children() -> Vc<()> { +fn many_children(_j: Option>) -> Vc<()> { for i in 0..COUNT2 { let _ = many_children_inner(i); } From a6d16c8c063d0e2a70c3097b5888987be6da0678 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 7 Jan 2025 11:54:06 +0100 Subject: [PATCH 2/9] add mark_as_completed --- .../turbo-tasks-backend/src/backend/mod.rs | 27 +++++++++++++++++++ .../crates/turbo-tasks-backend/src/data.rs | 1 + 2 files changed, 28 insertions(+) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index ecb267262eb25..a4944b78be4fc 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -998,6 +998,7 @@ impl TurboTasksBackendInner { once_task, done_event, session_dependent: false, + marked_as_completed: false, }, }); @@ -1319,6 +1320,7 @@ impl TurboTasksBackendInner { once_task: _, stale, session_dependent, + marked_as_completed: _, } = in_progress else { panic!("Task execution completed, but task is not in progress: {task:#?}"); @@ -1669,6 +1671,23 @@ impl TurboTasksBackendInner { } } + fn mark_own_task_as_finished( + &self, + task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) { + let mut ctx = self.execute_context(turbo_tasks); + let mut task = ctx.task(task, TaskDataCategory::Data); + if let Some(InProgressState::InProgress { + marked_as_completed, + .. + }) = get_mut!(task, InProgress) + { + *marked_as_completed = true; + // TODO this should remove the dirty state (also check session_dependent) + } + } + fn connect_task( &self, task: TaskId, @@ -1952,6 +1971,14 @@ impl Backend for TurboTasksBackend { self.0.update_task_cell(task_id, cell, content, turbo_tasks); } + fn mark_own_task_as_finished( + &self, + task_id: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { + self.0.mark_own_task_as_finished(task_id, turbo_tasks); + } + fn mark_own_task_as_session_dependent( &self, task: TaskId, diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 5ed4732cc8d91..3a12070f15f1e 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -258,6 +258,7 @@ pub enum InProgressState { #[allow(dead_code)] once_task: bool, session_dependent: bool, + marked_as_completed: bool, done_event: Event, }, } From a65b85a42e0f86fdd0bd8a65eee9fea426d4a494 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 12 Dec 2024 10:28:07 +0100 Subject: [PATCH 3/9] fix aggregation graph bug --- .../backend/operation/aggregation_update.rs | 122 +++++++++++++++--- 1 file changed, 101 insertions(+), 21 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index c93c1e5b2f44a..26ae9fce594c2 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -1245,13 +1245,14 @@ impl AggregationUpdateQueue { }; let mut upper_upper_ids_with_new_follower = Vec::new(); let mut is_aggregate_root = false; + let mut upper_aggregation_numbers = Vec::new(); upper_ids.retain(|&upper_id| { let mut upper = ctx.task(upper_id, TaskDataCategory::Meta); // decide if it should be an inner or follower let upper_aggregation_number = get_aggregation_number(&upper); if !is_root_node(upper_aggregation_number) - && upper_aggregation_number <= follower_aggregation_number + && upper_aggregation_number < follower_aggregation_number { // It's a follower of the upper node if update_count!( @@ -1269,6 +1270,7 @@ impl AggregationUpdateQueue { if upper.has_key(&CachedDataItemKey::AggregateRoot {}) { is_aggregate_root = true; } + upper_aggregation_numbers.push(upper_aggregation_number); true } }); @@ -1277,24 +1279,42 @@ impl AggregationUpdateQueue { let mut follower = ctx.task(new_follower_id, TaskDataCategory::Meta); let mut uppers_count: Option = None; let mut persistent_uppers = 0; - upper_ids.retain(|&upper_id| { - if update_count!(follower, Upper { task: upper_id }, 1) { - // It's a new upper - let uppers_count = uppers_count.get_or_insert_with(|| { - let count = - iter_many!(follower, Upper { .. } count if *count > 0 => ()).count(); - count - 1 - }); - *uppers_count += 1; - if !upper_id.is_transient() { - persistent_uppers += 1; + swap_retain_with_aux_vec( + &mut upper_ids, + &mut upper_aggregation_numbers, + |&mut upper_id, &mut upper_aggregation_number| { + if update_count!(follower, Upper { task: upper_id }, 1) { + // It's a new upper + let uppers_count = uppers_count.get_or_insert_with(|| { + let count = + iter_many!(follower, Upper { .. } count if *count > 0 => ()) + .count(); + count - 1 + }); + *uppers_count += 1; + if !upper_id.is_transient() { + persistent_uppers += 1; + } + + let follower_aggregation_number = get_aggregation_number(&follower); + + // Balancing is only needed when they are equal (or could have become equal + // in the meantime). This is not perfect from concurrent perspective, but we + // can accept a few incorrect invariants in the graph. + if upper_aggregation_number <= follower_aggregation_number { + self.push(AggregationUpdateJob::BalanceEdge { + upper_id, + task_id: new_follower_id, + }); + } + + true + } else { + // It's already an upper + false } - true - } else { - // It's already an upper - false - } - }); + }, + ); #[cfg(feature = "trace_aggregation_update")] let _span = trace_span!("new inner").entered(); if !upper_ids.is_empty() { @@ -1369,16 +1389,17 @@ impl AggregationUpdateQueue { let mut new_followers_of_upper_uppers = Vec::new(); let is_aggregate_root; let mut upper_upper_ids_for_new_followers = Vec::new(); + let upper_aggregation_number; { let mut upper = ctx.task(upper_id, TaskDataCategory::Meta); is_aggregate_root = upper.has_key(&CachedDataItemKey::AggregateRoot {}); // decide if it should be an inner or follower - let upper_aggregation_number = get_aggregation_number(&upper); + upper_aggregation_number = get_aggregation_number(&upper); if !is_root_node(upper_aggregation_number) { followers_with_aggregation_number.retain( |(follower_id, follower_aggregation_number)| { - if upper_aggregation_number <= *follower_aggregation_number { + if upper_aggregation_number < *follower_aggregation_number { // It's a follower of the upper node if update_count!(upper, Follower { task: *follower_id }, 1) { new_followers_of_upper_uppers.push(*follower_id); @@ -1411,12 +1432,23 @@ impl AggregationUpdateQueue { // It's a new upper let data = AggregatedDataUpdate::from_task(&mut follower); let children: Vec<_> = get_followers(&follower); + let follower_aggregation_number = get_aggregation_number(&follower); drop(follower); if !data.is_empty() { upper_data_updates.push(data); } upper_new_followers.extend(children); + + // Balancing is only needed when they are equal (or could have become equal in the + // meantime). This is not perfect from concurrent perspective, but we can accept a + // few incorrect invariants in the graph. + if upper_aggregation_number <= follower_aggregation_number { + self.push(AggregationUpdateJob::BalanceEdge { + upper_id, + task_id: follower_id, + }) + } } } if !upper_new_followers.is_empty() { @@ -1497,7 +1529,7 @@ impl AggregationUpdateQueue { let upper_aggregation_number = get_aggregation_number(&upper); if !is_root_node(upper_aggregation_number) - && upper_aggregation_number <= follower_aggregation_number + && upper_aggregation_number < follower_aggregation_number { #[cfg(feature = "trace_aggregation_update")] let _span = trace_span!("new follower").entered(); @@ -1535,6 +1567,7 @@ impl AggregationUpdateQueue { // It's a new upper let data = AggregatedDataUpdate::from_task(&mut follower); let children: Vec<_> = get_followers(&follower); + let follower_aggregation_number = get_aggregation_number(&follower); drop(follower); if !data.is_empty() { @@ -1555,6 +1588,16 @@ impl AggregationUpdateQueue { new_follower_ids: children, }); } + + // Balancing is only needed when they are equal (or could have become equal in the + // meantime). This is not perfect from concurrent perspective, but we can accept a + // few incorrect invariants in the graph. + if upper_aggregation_number <= follower_aggregation_number { + self.push(AggregationUpdateJob::BalanceEdge { + upper_id, + task_id: new_follower_id, + }); + } } } } @@ -1784,3 +1827,40 @@ impl Operation for AggregationUpdateQueue { } } } + +fn swap_retain_with_aux_vec( + vec: &mut Vec, + aux_vec: &mut Vec, + mut f: impl FnMut(&mut T, &mut A) -> bool, +) { + let mut i = 0; + while i < vec.len() { + if !f(&mut vec[i], &mut aux_vec[i]) { + vec.swap_remove(i); + aux_vec.swap_remove(i); + } else { + i += 1; + } + } +} + +#[cfg(test)] +mod tests { + use crate::backend::operation::aggregation_update::swap_retain_with_aux_vec; + + #[test] + fn test_swap_retain_with_aux_vec() { + let mut vec = vec![1, 2, 3, 4, 5]; + let mut aux = vec![1, 2, 3, 4, 5]; + swap_retain_with_aux_vec(&mut vec, &mut aux, |a, b| { + if *a % 2 == 0 { + false + } else { + *b += 10; + true + } + }); + assert_eq!(vec, vec![1, 5, 3]); + assert_eq!(aux, vec![11, 15, 13]); + } +} From e14a2324fea0c4ee1d25566470dbbde4a2525e19 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 7 Jan 2025 13:45:33 +0100 Subject: [PATCH 4/9] optimize update --- .../backend/operation/aggregation_update.rs | 297 +++++++++--------- 1 file changed, 145 insertions(+), 152 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index 26ae9fce594c2..0b116ed746693 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -1245,14 +1245,13 @@ impl AggregationUpdateQueue { }; let mut upper_upper_ids_with_new_follower = Vec::new(); let mut is_aggregate_root = false; - let mut upper_aggregation_numbers = Vec::new(); - upper_ids.retain(|&upper_id| { + swap_retain(&mut upper_ids, |&mut upper_id| { let mut upper = ctx.task(upper_id, TaskDataCategory::Meta); // decide if it should be an inner or follower let upper_aggregation_number = get_aggregation_number(&upper); if !is_root_node(upper_aggregation_number) - && upper_aggregation_number < follower_aggregation_number + && upper_aggregation_number <= follower_aggregation_number { // It's a follower of the upper node if update_count!( @@ -1264,13 +1263,22 @@ impl AggregationUpdateQueue { ) { upper_upper_ids_with_new_follower.extend(iter_uppers(&upper)); } + + // Balancing is only needed when they are equal. This is not perfect from + // concurrent perspective, but we can accept a few incorrect + // invariants in the graph. + if upper_aggregation_number == follower_aggregation_number { + self.push(AggregationUpdateJob::BalanceEdge { + upper_id, + task_id: new_follower_id, + }); + } false } else { // It's an inner node, continue with the list if upper.has_key(&CachedDataItemKey::AggregateRoot {}) { is_aggregate_root = true; } - upper_aggregation_numbers.push(upper_aggregation_number); true } }); @@ -1279,42 +1287,25 @@ impl AggregationUpdateQueue { let mut follower = ctx.task(new_follower_id, TaskDataCategory::Meta); let mut uppers_count: Option = None; let mut persistent_uppers = 0; - swap_retain_with_aux_vec( - &mut upper_ids, - &mut upper_aggregation_numbers, - |&mut upper_id, &mut upper_aggregation_number| { - if update_count!(follower, Upper { task: upper_id }, 1) { - // It's a new upper - let uppers_count = uppers_count.get_or_insert_with(|| { - let count = - iter_many!(follower, Upper { .. } count if *count > 0 => ()) - .count(); - count - 1 - }); - *uppers_count += 1; - if !upper_id.is_transient() { - persistent_uppers += 1; - } - - let follower_aggregation_number = get_aggregation_number(&follower); - - // Balancing is only needed when they are equal (or could have become equal - // in the meantime). This is not perfect from concurrent perspective, but we - // can accept a few incorrect invariants in the graph. - if upper_aggregation_number <= follower_aggregation_number { - self.push(AggregationUpdateJob::BalanceEdge { - upper_id, - task_id: new_follower_id, - }); - } - - true - } else { - // It's already an upper - false + swap_retain(&mut upper_ids, |&mut upper_id| { + if update_count!(follower, Upper { task: upper_id }, 1) { + // It's a new upper + let uppers_count = uppers_count.get_or_insert_with(|| { + let count = + iter_many!(follower, Upper { .. } count if *count > 0 => ()).count(); + count - 1 + }); + *uppers_count += 1; + if !upper_id.is_transient() { + persistent_uppers += 1; } - }, - ); + + true + } else { + // It's already an upper + false + } + }); #[cfg(feature = "trace_aggregation_update")] let _span = trace_span!("new inner").entered(); if !upper_ids.is_empty() { @@ -1399,16 +1390,24 @@ impl AggregationUpdateQueue { if !is_root_node(upper_aggregation_number) { followers_with_aggregation_number.retain( |(follower_id, follower_aggregation_number)| { - if upper_aggregation_number < *follower_aggregation_number { - // It's a follower of the upper node - if update_count!(upper, Follower { task: *follower_id }, 1) { - new_followers_of_upper_uppers.push(*follower_id); - } - false - } else { + if upper_aggregation_number > *follower_aggregation_number { // It's an inner node, continue with the list - true + return true; } + // It's a follower of the upper node + if update_count!(upper, Follower { task: *follower_id }, 1) { + new_followers_of_upper_uppers.push(*follower_id); + } + if upper_aggregation_number == *follower_aggregation_number { + // Balancing is only needed when they are equal. This is not + // perfect from concurrent perspective, but we + // can accept a few incorrect invariants in the graph. + self.push(AggregationUpdateJob::BalanceEdge { + upper_id, + task_id: *follower_id, + }) + } + false }, ); } @@ -1418,93 +1417,98 @@ impl AggregationUpdateQueue { } } - let mut upper_data_updates = Vec::new(); - let mut upper_new_followers = Vec::new(); - for &(follower_id, _) in followers_with_aggregation_number.iter() { - let mut follower = ctx.task(follower_id, TaskDataCategory::Meta); - if update_count!(follower, Upper { task: upper_id }, 1) { - if !upper_id.is_transient() - && update_ucount_and_get!(follower, PersistentUpperCount, 1).is_power_of_two() - { - self.push_optimize_task(follower_id); - } + if !followers_with_aggregation_number.is_empty() { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!("new inner").entered(); + let mut upper_data_updates = Vec::new(); + let mut upper_new_followers = Vec::new(); + for &(follower_id, _) in followers_with_aggregation_number.iter() { + let mut follower = ctx.task(follower_id, TaskDataCategory::Meta); + if update_count!(follower, Upper { task: upper_id }, 1) { + if !upper_id.is_transient() + && update_ucount_and_get!(follower, PersistentUpperCount, 1) + .is_power_of_two() + { + self.push_optimize_task(follower_id); + } - // It's a new upper - let data = AggregatedDataUpdate::from_task(&mut follower); - let children: Vec<_> = get_followers(&follower); - let follower_aggregation_number = get_aggregation_number(&follower); - drop(follower); + // It's a new upper + let data = AggregatedDataUpdate::from_task(&mut follower); + let children: Vec<_> = get_followers(&follower); + let follower_aggregation_number = get_aggregation_number(&follower); + drop(follower); - if !data.is_empty() { - upper_data_updates.push(data); + if !data.is_empty() { + upper_data_updates.push(data); + } + upper_new_followers.extend(children); + + // Balancing is only needed when they are equal (or could have become equal in + // the meantime). This is not perfect from concurrent + // perspective, but we can accept a few incorrect invariants + // in the graph. + if upper_aggregation_number <= follower_aggregation_number { + self.push(AggregationUpdateJob::BalanceEdge { + upper_id, + task_id: follower_id, + }) + } } - upper_new_followers.extend(children); + } - // Balancing is only needed when they are equal (or could have become equal in the - // meantime). This is not perfect from concurrent perspective, but we can accept a - // few incorrect invariants in the graph. - if upper_aggregation_number <= follower_aggregation_number { - self.push(AggregationUpdateJob::BalanceEdge { - upper_id, - task_id: follower_id, + if !upper_new_followers.is_empty() { + self.push(AggregationUpdateJob::InnerOfUpperHasNewFollowers { + upper_id, + new_follower_ids: upper_new_followers, + }); + } + if !upper_data_updates.is_empty() { + // add data to upper + let mut upper = ctx.task(upper_id, TaskDataCategory::Meta); + let diffs = upper_data_updates + .into_iter() + .filter_map(|data| { + let diff = data.apply(&mut upper, ctx.session_id(), self); + (!diff.is_empty()).then_some(diff) }) + .collect::>(); + let mut iter = diffs.into_iter(); + if let Some(mut diff) = iter.next() { + let upper_ids = get_uppers(&upper); + drop(upper); + // TODO merge AggregatedDataUpdate + for next_diff in iter { + self.push(AggregationUpdateJob::AggregatedDataUpdate { + upper_ids: upper_ids.clone(), + update: diff, + }); + diff = next_diff; + } + self.push(AggregationUpdateJob::AggregatedDataUpdate { + upper_ids, + update: diff, + }); } } + if is_aggregate_root { + self.extend_find_and_schedule_dirty( + followers_with_aggregation_number + .into_iter() + .map(|(id, _)| id), + ); + } } - if !upper_new_followers.is_empty() { + if !new_followers_of_upper_uppers.is_empty() { #[cfg(feature = "trace_aggregation_update")] let _span = trace_span!("new follower").entered(); - - self.push(AggregationUpdateJob::InnerOfUpperHasNewFollowers { - upper_id, - new_follower_ids: upper_new_followers, - }); - } - #[cfg(feature = "trace_aggregation_update")] - let _span = trace_span!("new inner").entered(); - if !upper_data_updates.is_empty() { - // add data to upper - let mut upper = ctx.task(upper_id, TaskDataCategory::Meta); - let diffs = upper_data_updates - .into_iter() - .filter_map(|data| { - let diff = data.apply(&mut upper, ctx.session_id(), self); - (!diff.is_empty()).then_some(diff) - }) - .collect::>(); - let mut iter = diffs.into_iter(); - if let Some(mut diff) = iter.next() { - let upper_ids = get_uppers(&upper); - drop(upper); - // TODO merge AggregatedDataUpdate - for next_diff in iter { - self.push(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids: upper_ids.clone(), - update: diff, - }); - diff = next_diff; - } - self.push(AggregationUpdateJob::AggregatedDataUpdate { - upper_ids, - update: diff, + // notify uppers about new follower + if !upper_upper_ids_for_new_followers.is_empty() { + self.push(AggregationUpdateJob::InnerOfUppersHasNewFollowers { + upper_ids: upper_upper_ids_for_new_followers, + new_follower_ids: new_followers_of_upper_uppers, }); } } - if is_aggregate_root { - self.extend_find_and_schedule_dirty( - followers_with_aggregation_number - .into_iter() - .map(|(id, _)| id), - ); - } - if !new_followers_of_upper_uppers.is_empty() - && !upper_upper_ids_for_new_followers.is_empty() - { - self.push(AggregationUpdateJob::InnerOfUppersHasNewFollowers { - upper_ids: upper_upper_ids_for_new_followers, - new_follower_ids: new_followers_of_upper_uppers, - }); - } } fn inner_of_upper_has_new_follower( @@ -1529,7 +1533,7 @@ impl AggregationUpdateQueue { let upper_aggregation_number = get_aggregation_number(&upper); if !is_root_node(upper_aggregation_number) - && upper_aggregation_number < follower_aggregation_number + && upper_aggregation_number <= follower_aggregation_number { #[cfg(feature = "trace_aggregation_update")] let _span = trace_span!("new follower").entered(); @@ -1550,6 +1554,16 @@ impl AggregationUpdateQueue { new_follower_id, }); } + + // Balancing is only needed when they are equal. This is not perfect from concurrent + // perspective, but we can accept a few incorrect invariants in the + // graph. + if upper_aggregation_number == follower_aggregation_number { + self.push(AggregationUpdateJob::BalanceEdge { + upper_id, + task_id: new_follower_id, + }); + } } } else { #[cfg(feature = "trace_aggregation_update")] @@ -1567,7 +1581,6 @@ impl AggregationUpdateQueue { // It's a new upper let data = AggregatedDataUpdate::from_task(&mut follower); let children: Vec<_> = get_followers(&follower); - let follower_aggregation_number = get_aggregation_number(&follower); drop(follower); if !data.is_empty() { @@ -1588,16 +1601,6 @@ impl AggregationUpdateQueue { new_follower_ids: children, }); } - - // Balancing is only needed when they are equal (or could have become equal in the - // meantime). This is not perfect from concurrent perspective, but we can accept a - // few incorrect invariants in the graph. - if upper_aggregation_number <= follower_aggregation_number { - self.push(AggregationUpdateJob::BalanceEdge { - upper_id, - task_id: new_follower_id, - }); - } } } } @@ -1707,6 +1710,10 @@ impl AggregationUpdateQueue { let _span = trace_span!("check optimize").entered(); let task = ctx.task(task_id, TaskDataCategory::Meta); + let children_count = get!(task, ChildrenCount).copied().unwrap_or_default(); + if children_count == 0 { + return; + } let aggregation_number = get!(task, AggregationNumber).copied().unwrap_or_default(); if is_root_node(aggregation_number.effective) { return; @@ -1828,16 +1835,11 @@ impl Operation for AggregationUpdateQueue { } } -fn swap_retain_with_aux_vec( - vec: &mut Vec, - aux_vec: &mut Vec, - mut f: impl FnMut(&mut T, &mut A) -> bool, -) { +fn swap_retain(vec: &mut Vec, mut f: impl FnMut(&mut T) -> bool) { let mut i = 0; while i < vec.len() { - if !f(&mut vec[i], &mut aux_vec[i]) { + if !f(&mut vec[i]) { vec.swap_remove(i); - aux_vec.swap_remove(i); } else { i += 1; } @@ -1846,21 +1848,12 @@ fn swap_retain_with_aux_vec( #[cfg(test)] mod tests { - use crate::backend::operation::aggregation_update::swap_retain_with_aux_vec; + use crate::backend::operation::aggregation_update::swap_retain; #[test] - fn test_swap_retain_with_aux_vec() { + fn test_swap_retain() { let mut vec = vec![1, 2, 3, 4, 5]; - let mut aux = vec![1, 2, 3, 4, 5]; - swap_retain_with_aux_vec(&mut vec, &mut aux, |a, b| { - if *a % 2 == 0 { - false - } else { - *b += 10; - true - } - }); + swap_retain(&mut vec, |a| if *a % 2 == 0 { false } else { true }); assert_eq!(vec, vec![1, 5, 3]); - assert_eq!(aux, vec![11, 15, 13]); } } From ef44c613246cdf7667fae6a7f826f6d3212dd950 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 8 Jan 2025 14:57:10 +0100 Subject: [PATCH 5/9] make sure to not miss the power_of_two mark --- .../src/backend/operation/aggregation_update.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index 0b116ed746693..0f558237f0bab 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -1309,8 +1309,10 @@ impl AggregationUpdateQueue { #[cfg(feature = "trace_aggregation_update")] let _span = trace_span!("new inner").entered(); if !upper_ids.is_empty() { - if update_ucount_and_get!(follower, PersistentUpperCount, persistent_uppers) - .is_power_of_two() + let new_count = + update_ucount_and_get!(follower, PersistentUpperCount, persistent_uppers); + if (new_count - persistent_uppers).next_power_of_two() + != new_count.next_power_of_two() { self.push_optimize_task(new_follower_id); } From ebba2caac8438057d340fa2a7752b1a810a00372 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 8 Jan 2025 08:56:54 +0100 Subject: [PATCH 6/9] add threshold for leaf uppers --- .../backend/operation/aggregation_update.rs | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index 0f558237f0bab..9e9d10afa2f3e 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -34,6 +34,7 @@ use crate::{ pub const LEAF_NUMBER: u32 = 16; const MAX_COUNT_BEFORE_YIELD: usize = 1000; +const MAX_UPPERS_FOR_LEAF: u32 = 3; /// Returns true, when a node is aggregating its children and a partial subgraph. pub fn is_aggregating_node(aggregation_number: u32) -> bool { @@ -1723,6 +1724,23 @@ impl AggregationUpdateQueue { let upper_count = get!(task, PersistentUpperCount) .copied() .unwrap_or_default(); + if !is_aggregating_node(aggregation_number.effective) { + if upper_count > MAX_UPPERS_FOR_LEAF { + #[cfg(feature = "trace_aggregation_update")] + let _span = trace_span!( + "optimize leaf", + old_aggregation_number = aggregation_number.effective, + upper_count + ) + .entered(); + self.push(AggregationUpdateJob::UpdateAggregationNumber { + task_id, + base_aggregation_number: LEAF_NUMBER, + distance: None, + }); + } + return; + } if upper_count <= aggregation_number.effective { // Doesn't need optimization return; @@ -1730,15 +1748,6 @@ impl AggregationUpdateQueue { let uppers = get_uppers(&task); drop(task); - if !is_aggregating_node(aggregation_number.effective) { - self.push(AggregationUpdateJob::UpdateAggregationNumber { - task_id, - base_aggregation_number: LEAF_NUMBER, - distance: None, - }); - return; - } - let mut root_uppers = 0; let mut uppers_aggregation_numbers = uppers From 975663661d710440c79cc3e0f279975c43079d79 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 8 Jan 2025 22:12:27 +0100 Subject: [PATCH 7/9] add test case --- .../turbo-tasks-testing/tests/performance.rs | 193 ++++++++++++++++-- 1 file changed, 175 insertions(+), 18 deletions(-) diff --git a/turbopack/crates/turbo-tasks-testing/tests/performance.rs b/turbopack/crates/turbo-tasks-testing/tests/performance.rs index f8e0a491e9764..3547540de8a6a 100644 --- a/turbopack/crates/turbo-tasks-testing/tests/performance.rs +++ b/turbopack/crates/turbo-tasks-testing/tests/performance.rs @@ -4,6 +4,7 @@ use std::{future::Future, time::Duration}; +use anyhow::Result; use turbo_tasks::{TransientInstance, Vc}; use turbo_tasks_testing::{register, run, Registration}; @@ -13,22 +14,26 @@ const COUNT1: u32 = 100; const COUNT2: u32 = 2000; async fn run_test( - f: impl Fn() -> F + Sync + Send + Copy + 'static, + f: impl Fn() -> F + Sync + Send + Clone + 'static, limit: Duration, ) -> anyhow::Result<()> where F: Future> + Sync + Send + 'static, { - // The first call will actually execute many_children and its children. + // The first call will actually execute everything. let start = std::time::Instant::now(); f().await?; println!("Initial call took {:?}", start.elapsed()); - // The second call will connect to the cached many_children, but it would be ok if that's - // not yet optimized. - let start = std::time::Instant::now(); - f().await?; - println!("Second call took {:?}", start.elapsed()); + let mut warmup_calls = Vec::new(); + + for _ in 0..10 { + let start = std::time::Instant::now(); + f().await?; + let warmup_call = start.elapsed(); + println!("Subsequent call took {:?}", warmup_call); + warmup_calls.push(warmup_call); + } // Susbsequent calls should be very fast. let start = std::time::Instant::now(); @@ -96,6 +101,17 @@ where let final_call = start.elapsed(); println!("Final call took {:?}", final_call); + let target = subsequent / COUNT1; + + for (i, warmup_call) in warmup_calls.into_iter().enumerate() { + assert!( + warmup_call < target * 10, + "Warmup call {} should be less than {:?}", + i, + target * 10 + ); + } + assert!( subsequent < limit * COUNT1, "Each call should be less than {:?}", @@ -117,16 +133,23 @@ where anyhow::Ok(()) } -#[tokio::test] -async fn many_calls_to_many_children() { +fn check_skip() -> bool { // if matches!( // std::env::var("TURBOPACK_TEST_PERFORMANCE").ok().as_deref(), // None | Some("") | Some("no") | Some("false") // ) { // println!("Skipping test, pass `TURBOPACK_TEST_PERFORMANCE=yes` to run it"); - // return; + // return true; // } + false +} + +#[tokio::test] +async fn many_calls_to_many_children() { + if check_skip() { + return; + } run(®ISTRATION, || { run_test( || calls_many_children(TransientInstance::new(()), None).strongly_consistent(), @@ -139,14 +162,9 @@ async fn many_calls_to_many_children() { #[tokio::test] async fn many_calls_to_uncached_many_children() { - // if matches!( - // std::env::var("TURBOPACK_TEST_PERFORMANCE").ok().as_deref(), - // None | Some("") | Some("no") | Some("false") - // ) { - // println!("Skipping test, pass `TURBOPACK_TEST_PERFORMANCE=yes` to run it"); - // return; - // } - + if check_skip() { + return; + } run(®ISTRATION, || { run_test( || { @@ -160,6 +178,126 @@ async fn many_calls_to_uncached_many_children() { .unwrap(); } +fn run_big_graph_test(counts: Vec) -> impl Future> + Send + 'static { + println!( + "Graph {:?} = {} tasks", + counts, + (1..=counts.len()) + .into_iter() + .map(|i| counts.iter().take(i).product::()) + .sum::() + ); + run_test( + move || calls_big_graph(counts.clone(), TransientInstance::new(())).strongly_consistent(), + Duration::from_micros(100), + ) +} + +#[tokio::test] +async fn many_calls_to_big_graph_1() { + if check_skip() { + return; + } + run(®ISTRATION, || run_big_graph_test(vec![5, 8, 10, 15, 20])) + .await + .unwrap(); +} + +#[tokio::test] +async fn many_calls_to_big_graph_2() { + if check_skip() { + return; + } + run(®ISTRATION, || { + run_big_graph_test(vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn many_calls_to_big_graph_3() { + if check_skip() { + return; + } + run(®ISTRATION, || run_big_graph_test(vec![1000, 3, 3, 3, 3])) + .await + .unwrap(); +} + +#[tokio::test] +async fn many_calls_to_big_graph_4() { + if check_skip() { + return; + } + run(®ISTRATION, || run_big_graph_test(vec![3, 3, 3, 3, 1000])) + .await + .unwrap(); +} + +#[tokio::test] +async fn many_calls_to_big_graph_5() { + if check_skip() { + return; + } + run(®ISTRATION, || { + run_big_graph_test(vec![10, 10, 10, 10, 10]) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn many_calls_to_big_graph_6() { + if check_skip() { + return; + } + run(®ISTRATION, || { + run_big_graph_test(vec![2, 2, 2, 1000, 2, 2, 2]) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn many_calls_to_big_graph_7() { + if check_skip() { + return; + } + run(®ISTRATION, || { + run_big_graph_test(vec![ + 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 3, 1, 1, 1, 3, 2, 1, 1, 1, 1, 5, 1, 1, 1, 200, 2, 1, + 1, 1, 1, 1, 1, 1, 1, 1, + ]) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn many_calls_to_big_graph_8() { + if check_skip() { + return; + } + run(®ISTRATION, || { + run_big_graph_test(vec![200, 2, 2, 2, 2, 200]) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn many_calls_to_big_graph_9() { + if check_skip() { + return; + } + run(®ISTRATION, || { + run_big_graph_test(vec![10000, 1, 1, 2, 1, 1, 2, 2, 1, 1, 1, 1]) + }) + .await + .unwrap(); +} + #[turbo_tasks::value] struct Value { value: u32, @@ -186,3 +324,22 @@ fn many_children(_j: Option>) -> Vc<()> { fn many_children_inner(_i: u32) -> Vc<()> { Vc::cell(()) } + +#[turbo_tasks::function] +async fn calls_big_graph(mut counts: Vec, _i: TransientInstance<()>) -> Vc<()> { + counts.reverse(); + let _ = big_graph(counts, vec![]); + Vc::cell(()) +} + +#[turbo_tasks::function] +fn big_graph(mut counts: Vec, keys: Vec) -> Vc<()> { + let Some(count) = counts.pop() else { + return Vc::cell(()); + }; + for i in 0..count { + let new_keys = keys.iter().copied().chain(std::iter::once(i)).collect(); + let _ = big_graph(counts.clone(), new_keys); + } + Vc::cell(()) +} From 046cfa2e96ed3286affdd2d742588c0c4e649d10 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 13 Jan 2025 22:23:28 +0100 Subject: [PATCH 8/9] clippy --- .../src/backend/operation/aggregation_update.rs | 16 +++++++--------- .../turbo-tasks-testing/tests/performance.rs | 1 - 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index 9e9d10afa2f3e..3f2359ff2202c 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -1501,16 +1501,14 @@ impl AggregationUpdateQueue { ); } } - if !new_followers_of_upper_uppers.is_empty() { + // notify uppers about new follower + if !upper_upper_ids_for_new_followers.is_empty() { #[cfg(feature = "trace_aggregation_update")] let _span = trace_span!("new follower").entered(); - // notify uppers about new follower - if !upper_upper_ids_for_new_followers.is_empty() { - self.push(AggregationUpdateJob::InnerOfUppersHasNewFollowers { - upper_ids: upper_upper_ids_for_new_followers, - new_follower_ids: new_followers_of_upper_uppers, - }); - } + self.push(AggregationUpdateJob::InnerOfUppersHasNewFollowers { + upper_ids: upper_upper_ids_for_new_followers, + new_follower_ids: new_followers_of_upper_uppers, + }); } } @@ -1864,7 +1862,7 @@ mod tests { #[test] fn test_swap_retain() { let mut vec = vec![1, 2, 3, 4, 5]; - swap_retain(&mut vec, |a| if *a % 2 == 0 { false } else { true }); + swap_retain(&mut vec, |a| *a % 2 != 0); assert_eq!(vec, vec![1, 5, 3]); } } diff --git a/turbopack/crates/turbo-tasks-testing/tests/performance.rs b/turbopack/crates/turbo-tasks-testing/tests/performance.rs index 3547540de8a6a..8bd9d6b553b76 100644 --- a/turbopack/crates/turbo-tasks-testing/tests/performance.rs +++ b/turbopack/crates/turbo-tasks-testing/tests/performance.rs @@ -183,7 +183,6 @@ fn run_big_graph_test(counts: Vec) -> impl Future> + Se "Graph {:?} = {} tasks", counts, (1..=counts.len()) - .into_iter() .map(|i| counts.iter().take(i).product::()) .sum::() ); From 58b9f293a125f4d31a1c6b645dba797e2008d409 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 14 Jan 2025 13:41:54 +0100 Subject: [PATCH 9/9] skip performance test case without env var for flakyness reasons --- .../turbo-tasks-testing/tests/performance.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/turbopack/crates/turbo-tasks-testing/tests/performance.rs b/turbopack/crates/turbo-tasks-testing/tests/performance.rs index 8bd9d6b553b76..f576ba05caf09 100644 --- a/turbopack/crates/turbo-tasks-testing/tests/performance.rs +++ b/turbopack/crates/turbo-tasks-testing/tests/performance.rs @@ -134,13 +134,13 @@ where } fn check_skip() -> bool { - // if matches!( - // std::env::var("TURBOPACK_TEST_PERFORMANCE").ok().as_deref(), - // None | Some("") | Some("no") | Some("false") - // ) { - // println!("Skipping test, pass `TURBOPACK_TEST_PERFORMANCE=yes` to run it"); - // return true; - // } + if matches!( + std::env::var("TURBOPACK_TEST_PERFORMANCE").ok().as_deref(), + None | Some("") | Some("no") | Some("false") + ) { + println!("Skipping test, pass `TURBOPACK_TEST_PERFORMANCE=yes` to run it"); + return true; + } false }