diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index fae89c59b9713..fafd96844a299 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -42,9 +42,10 @@ use crate::backend::operation::TaskDirtyCause; use crate::{ backend::{ operation::{ - get_aggregation_number, is_root_node, AggregatedDataUpdate, AggregationUpdateJob, - AggregationUpdateQueue, CleanupOldEdgesOperation, ConnectChildOperation, - ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, TaskGuard, + connect_children, get_aggregation_number, is_root_node, AggregatedDataUpdate, + AggregationUpdateJob, AggregationUpdateQueue, CleanupOldEdgesOperation, + ConnectChildOperation, ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, + TaskGuard, }, persisted_storage_log::PersistedStorageLog, storage::{get, get_many, get_mut, get_mut_or_insert_with, iter_many, remove, Storage}, @@ -53,7 +54,8 @@ use crate::{ data::{ ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType, CachedDataItemValue, CachedDataItemValueRef, CachedDataUpdate, CellRef, CollectibleRef, - CollectiblesRef, DirtyState, InProgressCellState, InProgressState, OutputValue, RootType, + CollectiblesRef, DirtyState, InProgressCellState, InProgressState, InProgressStateInner, + OutputValue, RootType, }, utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded}, }; @@ -398,21 +400,31 @@ impl TurboTasksBackendInner { let mut ctx = self.execute_context(turbo_tasks); let mut task = ctx.task(task_id, TaskDataCategory::All); - if let Some(in_progress) = get!(task, InProgress) { - match in_progress { - InProgressState::Scheduled { done_event, .. } - | InProgressState::InProgress { done_event, .. } => { - let reader_desc = reader.map(|r| self.get_task_desc_fn(r)); - let listener = done_event.listen_with_note(move || { - if let Some(reader_desc) = reader_desc.as_ref() { - format!("try_read_task_output from {}", reader_desc()) - } else { - "try_read_task_output (untracked)".to_string() - } - }); - return Ok(Err(listener)); + fn check_in_progress( + this: &TurboTasksBackendInner, + task: &impl TaskGuard, + reader: Option, + ) -> Option, anyhow::Error>> + { + if let Some(in_progress) = get!(task, InProgress) { + match in_progress { + InProgressState::Scheduled { done_event, .. } + | InProgressState::InProgress(box InProgressStateInner { + done_event, .. + }) => { + let reader_desc = reader.map(|r| this.get_task_desc_fn(r)); + let listener = done_event.listen_with_note(move || { + if let Some(reader_desc) = reader_desc.as_ref() { + format!("try_read_task_output from {}", reader_desc()) + } else { + "try_read_task_output (untracked)".to_string() + } + }); + return Some(Ok(Err(listener))); + } } } + None } if self.should_track_children() && matches!(consistency, ReadConsistency::Strong) { @@ -450,14 +462,18 @@ impl TurboTasksBackendInner { .unwrap_or_default() .get(self.session_id); if dirty_tasks > 0 || is_dirty { - let root = get!(task, Activeness); + let root = get_mut!(task, Activeness); let mut task_ids_to_schedule: Vec<_> = Vec::new(); // When there are dirty task, subscribe to the all_clean_event let root = if let Some(root) = root { + // This makes sure all tasks stay active and this task won't stale. + // active_until_clean is automatically removed when this + // task is clean. + root.set_active_until_clean(); root } else { // If we don't have a root state, add one. This also makes sure all tasks stay - // active and this task won't stale. CachedActiveUntilClean + // active and this task won't stale. active_until_clean // is automatically removed when this task is clean. get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id)) .set_active_until_clean(); @@ -470,9 +486,7 @@ impl TurboTasksBackendInner { task } ); - if is_dirty { - task_ids_to_schedule.push(task_id); - } + task_ids_to_schedule.push(task_id); get!(task, Activeness).unwrap() }; let listener = root.all_clean_event.listen_with_note(move || { @@ -492,6 +506,10 @@ impl TurboTasksBackendInner { } } + if let Some(value) = check_in_progress(self, &task, reader) { + return value; + } + if let Some(output) = get!(task, Output) { let result = match output { OutputValue::Cell(cell) => Some(Ok(Ok(RawVc::TaskCell(cell.task, cell.cell)))), @@ -995,40 +1013,17 @@ impl TurboTasksBackendInner { return None; }; task.add_new(CachedDataItem::InProgress { - value: InProgressState::InProgress { + value: InProgressState::InProgress(Box::new(InProgressStateInner { stale: false, once_task, done_event, session_dependent: false, marked_as_completed: false, - }, + new_children: Default::default(), + })), }); if self.should_track_children() { - // Make all current children outdated (remove left-over outdated children) - enum Child { - Current(TaskId), - Outdated(TaskId), - } - let children = iter_many!(task, Child { task } => Child::Current(task)) - .chain(iter_many!(task, OutdatedChild { task } => Child::Outdated(task))) - .collect::>(); - for child in children { - match child { - Child::Current(child) => { - let _ = task.add(CachedDataItem::OutdatedChild { - task: child, - value: (), - }); - } - Child::Outdated(child) => { - if !task.has_key(&CachedDataItemKey::Child { task: child }) { - task.remove(&CachedDataItemKey::OutdatedChild { task: child }); - } - } - } - } - // Make all current collectibles outdated (remove left-over outdated collectibles) enum Collectible { Current(CollectibleRef, i32), @@ -1141,26 +1136,47 @@ impl TurboTasksBackendInner { stateful: bool, turbo_tasks: &dyn TurboTasksBackendApi>, ) -> bool { + let _span = tracing::trace_span!("task execution completed").entered(); let mut ctx = self.execute_context(turbo_tasks); let mut task = ctx.task(task_id, TaskDataCategory::All); - let Some(in_progress) = get!(task, InProgress) else { + let Some(in_progress) = get_mut!(task, InProgress) else { panic!("Task execution completed, but task is not in progress: {task:#?}"); }; - let &InProgressState::InProgress { stale, .. } = in_progress else { + let &mut InProgressState::InProgress(box InProgressStateInner { + stale, + ref mut new_children, + .. + }) = in_progress + else { panic!("Task execution completed, but task is not in progress: {task:#?}"); }; // If the task is stale, reschedule it if stale { - let Some(InProgressState::InProgress { done_event, .. }) = remove!(task, InProgress) + let Some(InProgressState::InProgress(box InProgressStateInner { + done_event, + new_children, + .. + })) = remove!(task, InProgress) else { unreachable!(); }; task.add_new(CachedDataItem::InProgress { value: InProgressState::Scheduled { done_event }, }); + drop(task); + + // All `new_children` are currently hold active with an active count and we need to undo + // that. + AggregationUpdateQueue::run( + AggregationUpdateJob::DecreaseActiveCounts { + task_ids: new_children.into_iter().collect(), + }, + &mut ctx, + ); return true; } + let mut new_children = take(new_children); // TODO handle stateful let _ = stateful; @@ -1187,9 +1203,30 @@ impl TurboTasksBackendInner { task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type }); } - let mut removed_data: Vec = Vec::new(); + let mut queue = AggregationUpdateQueue::new(); + + let mut removed_data = Vec::new(); let mut old_edges = Vec::new(); + // Connect children + { + for old_child in iter_many!(task, Child { task } => task) { + if !new_children.remove(&old_child) { + old_edges.push(OutdatedEdge::Child(old_child)); + } + } + + let has_active_count = + get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0); + connect_children( + task_id, + &mut task, + new_children, + &mut queue, + has_active_count, + ); + } + // Remove no longer existing cells and notify in progress cells // find all outdated data items (removed cells, outdated edges) removed_data.extend( @@ -1214,12 +1251,6 @@ impl TurboTasksBackendInner { .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index)) })); if self.should_track_children() { - old_edges.extend(task.iter(CachedDataItemType::OutdatedChild).filter_map( - |(key, _)| match key { - CachedDataItemKey::OutdatedChild { task } => Some(OutdatedEdge::Child(task)), - _ => None, - }, - )); old_edges.extend( task.iter(CachedDataItemType::OutdatedCollectible) .filter_map(|(key, value)| match (key, value) { @@ -1256,10 +1287,13 @@ impl TurboTasksBackendInner { drop(task); - // Remove outdated edges first, before removing in_progress+dirty flag. - // We need to make sure all outdated edges are removed before the task can potentially be - // scheduled and executed again - CleanupOldEdgesOperation::run(task_id, old_edges, &mut ctx); + { + let _span = tracing::trace_span!("CleanupOldEdgesOperation").entered(); + // Remove outdated edges first, before removing in_progress+dirty flag. + // We need to make sure all outdated edges are removed before the task can potentially + // be scheduled and executed again + CleanupOldEdgesOperation::run(task_id, old_edges, queue, &mut ctx); + } // When restoring from persistent caching the following might not be executed (since we can // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and @@ -1269,16 +1303,18 @@ impl TurboTasksBackendInner { let Some(in_progress) = remove!(task, InProgress) else { panic!("Task execution completed, but task is not in progress: {task:#?}"); }; - let InProgressState::InProgress { + let InProgressState::InProgress(box InProgressStateInner { done_event, once_task: _, stale, session_dependent, marked_as_completed: _, - } = in_progress + new_children, + }) = in_progress else { panic!("Task execution completed, but task is not in progress: {task:#?}"); }; + debug_assert!(new_children.is_empty()); // If the task is stale, reschedule it if stale { @@ -1618,9 +1654,10 @@ impl TurboTasksBackendInner { ) { let mut ctx = self.execute_context(turbo_tasks); let mut task = ctx.task(task, TaskDataCategory::Data); - if let Some(InProgressState::InProgress { - session_dependent, .. - }) = get_mut!(task, InProgress) + if let Some(InProgressState::InProgress(box InProgressStateInner { + session_dependent, + .. + })) = get_mut!(task, InProgress) { *session_dependent = true; } @@ -1633,10 +1670,10 @@ impl TurboTasksBackendInner { ) { let mut ctx = self.execute_context(turbo_tasks); let mut task = ctx.task(task, TaskDataCategory::Data); - if let Some(InProgressState::InProgress { + if let Some(InProgressState::InProgress(box InProgressStateInner { marked_as_completed, .. - }) = get_mut!(task, InProgress) + })) = get_mut!(task, InProgress) { *marked_as_completed = true; // TODO this should remove the dirty state (also check session_dependent) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index 3d46de35a6848..603573b5e9bc6 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -8,7 +8,7 @@ use std::{ use indexmap::map::Entry; use rustc_hash::{FxHashMap, FxHashSet}; -use serde::{Deserialize, Serialize}; +use serde::{ser::SerializeSeq, Deserialize, Serialize, Serializer}; use smallvec::{smallvec, SmallVec}; #[cfg(any( feature = "trace_aggregation_update", @@ -169,17 +169,23 @@ pub enum AggregationUpdateJob { collectible_type: turbo_tasks::TraitTypeId, }, /// Increases the active counter of the task + #[serde(skip)] IncreaseActiveCount { task: TaskId }, /// Increases the active counters of the tasks + #[serde(skip)] IncreaseActiveCounts { task_ids: TaskIdVec }, /// Decreases the active counter of the task + #[serde(skip)] DecreaseActiveCount { task: TaskId }, /// Decreases the active counters of the tasks + #[serde(skip)] DecreaseActiveCounts { task_ids: TaskIdVec }, /// Balances the edges of the graph. This checks if the graph invariant is still met for this /// edge and coverts a upper edge to a follower edge or vice versa. Balancing might triggers /// more changes to the structure. BalanceEdge { upper_id: TaskId, task_id: TaskId }, + /// Does nothing. This is used to filter out transient jobs during serialization. + Noop, } impl AggregationUpdateJob { @@ -561,9 +567,37 @@ impl PartialEq for FindAndScheduleJob { impl Eq for FindAndScheduleJob {} +/// Serializes the jobs in the queue. This is used to filter out transient jobs during +/// serialization. +fn serialize_jobs( + jobs: &VecDeque, + serializer: S, +) -> Result { + let mut seq = serializer.serialize_seq(Some(jobs.len()))?; + for job in jobs { + match job.job { + AggregationUpdateJob::IncreaseActiveCount { .. } + | AggregationUpdateJob::IncreaseActiveCounts { .. } + | AggregationUpdateJob::DecreaseActiveCount { .. } + | AggregationUpdateJob::DecreaseActiveCounts { .. } => { + seq.serialize_element(&AggregationUpdateJobItem { + job: AggregationUpdateJob::Noop, + #[cfg(feature = "trace_aggregation_update")] + span: None, + })?; + } + _ => { + seq.serialize_element(job)?; + } + } + } + seq.end() +} + /// A queue for aggregation update jobs. #[derive(Default, Serialize, Deserialize, Clone)] pub struct AggregationUpdateQueue { + #[serde(serialize_with = "serialize_jobs")] jobs: VecDeque, number_updates: FxIndexMap, done_number_updates: FxHashMap, @@ -706,8 +740,9 @@ impl AggregationUpdateQueue { /// Executes a single step of the queue. Returns true, when the queue is empty. pub fn process(&mut self, ctx: &mut impl ExecuteContext) -> bool { if let Some(job) = self.jobs.pop_front() { - let job = job.entered(); + let job: AggregationUpdateJobGuard = job.entered(); match job.job { + AggregationUpdateJob::Noop => {} AggregationUpdateJob::UpdateAggregationNumber { .. } | AggregationUpdateJob::BalanceEdge { .. } => { // These jobs are never pushed to the queue @@ -1000,8 +1035,6 @@ impl AggregationUpdateQueue { // followers let data = AggregatedDataUpdate::from_task(&mut task); let followers = get_followers(&task); - let has_active_count = - get!(task, Activeness).is_some_and(|a| a.active_counter > 0); let diff = data.apply(&mut upper, ctx.session_id(), self); if !upper_ids.is_empty() && !diff.is_empty() { @@ -1015,12 +1048,6 @@ impl AggregationUpdateQueue { ); } if !followers.is_empty() { - if has_active_count { - // TODO combine both operations to avoid the clone - self.push(AggregationUpdateJob::IncreaseActiveCounts { - task_ids: followers.clone(), - }) - } self.push(AggregationUpdateJob::InnerOfUpperHasNewFollowers { upper_id, new_follower_ids: followers, @@ -1041,6 +1068,14 @@ impl AggregationUpdateQueue { lost_follower_id: task_id, }); } + + // Follower was removed, we might need to update the active count + let has_active_count = + get!(upper, Activeness).is_some_and(|a| a.active_counter > 0); + if has_active_count { + // TODO combine both operations to avoid the clone + self.push(AggregationUpdateJob::DecreaseActiveCount { task: task_id }) + } } std::cmp::Ordering::Equal => {} } @@ -1065,7 +1100,9 @@ impl AggregationUpdateQueue { self.push_optimize_task(upper_id); } // update active count - if get!(task, Activeness).is_some_and(|a| a.active_counter > 0) { + let has_active_count = + get!(task, Activeness).is_some_and(|a| a.active_counter > 0); + if has_active_count { self.push(AggregationUpdateJob::IncreaseActiveCount { task: task_id }); } // notify uppers about new follower @@ -1140,17 +1177,18 @@ impl AggregationUpdateQueue { ctx.schedule(task_id); } } - if is_aggregating_node(get_aggregation_number(&task)) { + let aggregation_number = get_aggregation_number(&task); + if is_aggregating_node(aggregation_number) { // if it has `Activeness` we can skip visiting the nested nodes since // this would already be scheduled by the `Activeness` - if !task.has_key(&CachedDataItemKey::Activeness {}) { + let is_active_until_clean = + get!(task, Activeness).is_some_and(|a| a.active_until_clean); + if !is_active_until_clean { let dirty_containers: Vec<_> = get_many!(task, AggregatedDirtyContainer { task } count if count.get(session_id) > 0 => task); if !dirty_containers.is_empty() || dirty { - let mut activeness_state = ActivenessState::new(task_id); + let activeness_state = + get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id)); activeness_state.set_active_until_clean(); - task.insert(CachedDataItem::Activeness { - value: activeness_state, - }); drop(task); self.extend_find_and_schedule_dirty(dirty_containers); @@ -1424,7 +1462,9 @@ impl AggregationUpdateQueue { } // update active count - if get!(upper, Activeness).is_some_and(|a| a.active_counter > 0) { + let has_active_count = + get!(upper, Activeness).is_some_and(|a| a.active_counter > 0); + if has_active_count { tasks_for_which_increment_active_count.push(new_follower_id); } // notify uppers about new follower @@ -1612,7 +1652,7 @@ impl AggregationUpdateQueue { for &(follower_id, _) in followers_with_aggregation_number.iter() { let mut follower = ctx.task(follower_id, TaskDataCategory::Meta); if update_count!(follower, Upper { task: upper_id }, 1) { - if !upper_id.is_transient() && count!(follower, Upper).is_power_of_two() { + if count!(follower, Upper).is_power_of_two() { self.push_optimize_task(follower_id); } @@ -1631,7 +1671,9 @@ impl AggregationUpdateQueue { // the meantime). This is not perfect from concurrent // perspective, but we can accept a few incorrect invariants // in the graph. - if upper_aggregation_number <= follower_aggregation_number { + if upper_aggregation_number <= follower_aggregation_number + && !is_root_node(upper_aggregation_number) + { self.push(AggregationUpdateJob::BalanceEdge { upper_id, task_id: follower_id, @@ -1720,13 +1762,16 @@ impl AggregationUpdateQueue { #[cfg(feature = "trace_aggregation_update")] let _span = trace_span!("process new follower").entered(); - let follower_aggregation_number = { + let (follower_aggregation_number, already_active) = { let follower = ctx.task(new_follower_id, TaskDataCategory::Meta); - get_aggregation_number(&follower) + ( + get_aggregation_number(&follower), + follower.has_key(&CachedDataItemKey::Activeness {}), + ) }; let mut upper = ctx.task(upper_id, TaskDataCategory::Meta); - if upper.has_key(&CachedDataItemKey::Activeness {}) { + if !already_active && upper.has_key(&CachedDataItemKey::Activeness {}) { self.push_find_and_schedule_dirty(new_follower_id); } // decide if it should be an inner or follower @@ -1787,12 +1832,12 @@ impl AggregationUpdateQueue { drop(upper); let mut follower = ctx.task(new_follower_id, TaskDataCategory::Meta); if update_count!(follower, Upper { task: upper_id }, 1) { - if !upper_id.is_transient() && count!(follower, Upper).is_power_of_two() { + if count!(follower, Upper).is_power_of_two() { self.push_optimize_task(new_follower_id); } // It's a new upper let data = AggregatedDataUpdate::from_task(&mut follower); - let children = get_followers(&follower); + let followers = get_followers(&follower); drop(follower); if !data.is_empty() { @@ -1810,10 +1855,10 @@ impl AggregationUpdateQueue { ); } } - if !children.is_empty() { + if !followers.is_empty() { self.push(AggregationUpdateJob::InnerOfUpperHasNewFollowers { upper_id, - new_follower_ids: children, + new_follower_ids: followers, }); } } @@ -1848,13 +1893,13 @@ impl AggregationUpdateQueue { let mut task = ctx.task(task_id, TaskDataCategory::Meta); let state = get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id)); - let is_positive = state.increment_active_counter(); + let is_positive_now = state.increment_active_counter(); let is_empty = state.is_empty(); - // This can happen if active count is negative before + // This can happen if active count was negative before if is_empty { task.remove(&CachedDataItemKey::Activeness {}); } - if is_positive { + if is_positive_now { let followers = get_followers(&task); drop(task); if !followers.is_empty() { diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs index e73386aea00e1..7df97577c655e 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs @@ -54,8 +54,12 @@ pub enum OutdatedEdge { } impl CleanupOldEdgesOperation { - pub fn run(task_id: TaskId, outdated: Vec, ctx: &mut impl ExecuteContext) { - let queue = AggregationUpdateQueue::new(); + pub fn run( + task_id: TaskId, + outdated: Vec, + queue: AggregationUpdateQueue, + ctx: &mut impl ExecuteContext, + ) { CleanupOldEdgesOperation::RemoveEdges { task_id, outdated, @@ -98,8 +102,10 @@ impl Operation for CleanupOldEdgesOperation { }); } else { let upper_ids = get_uppers(&task); - if get!(task, Activeness).is_some_and(|a| a.active_counter > 0) - { + let has_active_count = get!(task, Activeness) + .is_some_and(|a| a.active_counter > 0); + drop(task); + if has_active_count { // TODO combine both operations to avoid the clone queue.push(AggregationUpdateJob::DecreaseActiveCounts { task_ids: children.clone(), diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs index d7338d6cf54f9..8c211f3b50bc4 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs @@ -1,25 +1,18 @@ -use std::{cmp::max, num::NonZeroU32}; - use serde::{Deserialize, Serialize}; use turbo_tasks::TaskId; use crate::{ backend::{ + get_mut, operation::{ - aggregation_update::{ - get_uppers, is_aggregating_node, AggregationUpdateJob, AggregationUpdateQueue, - LEAF_NUMBER, - }, - is_root_node, ExecuteContext, Operation, TaskGuard, + aggregation_update::{AggregationUpdateJob, AggregationUpdateQueue}, + ExecuteContext, Operation, TaskGuard, }, - storage::{count, get}, TaskDataCategory, }, - data::{CachedDataItem, CachedDataItemKey}, + data::{CachedDataItem, CachedDataItemKey, InProgressState, InProgressStateInner}, }; -const AGGREGATION_NUMBER_BUFFER_SPACE: u32 = 3; - #[derive(Serialize, Deserialize, Clone, Default)] #[allow(clippy::large_enum_variant)] pub enum ConnectChildOperation { @@ -45,111 +38,43 @@ impl ConnectChildOperation { return; } let mut parent_task = ctx.task(parent_task_id, TaskDataCategory::All); + let Some(InProgressState::InProgress(box InProgressStateInner { new_children, .. })) = + get_mut!(parent_task, InProgress) + else { + panic!("Task is not in progress while calling another task"); + }; + // Quick skip if the child was already connected before - if parent_task - .remove(&CachedDataItemKey::OutdatedChild { - task: child_task_id, - }) - .is_some() - { + if !new_children.insert(child_task_id) { return; } - if parent_task.add(CachedDataItem::Child { + if parent_task.has_key(&CachedDataItemKey::Child { task: child_task_id, - value: (), }) { - let mut queue = AggregationUpdateQueue::new(); - - if get!(parent_task, Activeness).is_some_and(|a| a.active_counter > 0) { - queue.push(AggregationUpdateJob::IncreaseActiveCount { - task: child_task_id, - }) - } - - // Get the children count - let children_count = count!(parent_task, Child); + // It is already connected, we can skip the rest + return; + } + drop(parent_task); - // Compute future parent aggregation number based on the number of children - let current_parent_aggregation = get!(parent_task, AggregationNumber) - .copied() - .unwrap_or_default(); - let (parent_aggregation, future_parent_aggregation) = - if is_root_node(current_parent_aggregation.base) { - (u32::MAX, u32::MAX) - } else { - let target_distance = children_count.ilog2() * 2; - if target_distance > current_parent_aggregation.distance { - queue.push(AggregationUpdateJob::UpdateAggregationNumber { - task_id: parent_task_id, - base_aggregation_number: 0, - distance: NonZeroU32::new(target_distance), - }) - } - ( - current_parent_aggregation.effective, - current_parent_aggregation.base.saturating_add(max( - target_distance, - current_parent_aggregation.distance, - )), - ) - }; + let mut queue = AggregationUpdateQueue::new(); - // Update child aggregation number based on parent aggregation number - let aggregating_node = is_aggregating_node(parent_aggregation); - if parent_task_id.is_transient() && !child_task_id.is_transient() { - queue.push(AggregationUpdateJob::UpdateAggregationNumber { - task_id: child_task_id, - base_aggregation_number: u32::MAX, - distance: None, - }); - } else if !aggregating_node { - let base_aggregation_number = - future_parent_aggregation.saturating_add(AGGREGATION_NUMBER_BUFFER_SPACE); - queue.push(AggregationUpdateJob::UpdateAggregationNumber { - task_id: child_task_id, - base_aggregation_number: if is_aggregating_node( - base_aggregation_number.saturating_add(AGGREGATION_NUMBER_BUFFER_SPACE - 1), - ) { - LEAF_NUMBER - } else { - base_aggregation_number - }, - distance: None, - }); - } - if aggregating_node { - queue.push(AggregationUpdateJob::InnerOfUpperHasNewFollower { - upper_id: parent_task_id, - new_follower_id: child_task_id, - }); - } else { - let upper_ids = get_uppers(&parent_task); - queue.push(AggregationUpdateJob::InnerOfUppersHasNewFollower { - upper_ids, - new_follower_id: child_task_id, - }); - } - drop(parent_task); + // Handle the transient to persistent boundary by making the persistent task a root task + if parent_task_id.is_transient() && !child_task_id.is_transient() { + queue.push(AggregationUpdateJob::UpdateAggregationNumber { + task_id: child_task_id, + base_aggregation_number: u32::MAX, + distance: None, + }); + } - { - let mut task = ctx.task(child_task_id, TaskDataCategory::Meta); - if !task.has_key(&CachedDataItemKey::Output {}) { - let description = ctx.get_task_desc_fn(child_task_id); - let should_schedule = task.add(CachedDataItem::new_scheduled(description)); - drop(task); - if should_schedule { - ctx.schedule(child_task_id); - } - } - } + queue.push(AggregationUpdateJob::IncreaseActiveCount { + task: child_task_id, + }); - #[cfg(feature = "trace_aggregation_update")] - let _span = tracing::trace_span!("connect_child").entered(); - ConnectChildOperation::UpdateAggregation { - aggregation_update: queue, - } - .execute(&mut ctx); + ConnectChildOperation::UpdateAggregation { + aggregation_update: queue, } + .execute(&mut ctx); } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_children.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_children.rs new file mode 100644 index 0000000000000..b66d02a366800 --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_children.rs @@ -0,0 +1,113 @@ +use std::{cmp::max, num::NonZeroU32}; + +use rustc_hash::FxHashSet; +use smallvec::SmallVec; +use turbo_tasks::TaskId; + +use crate::{ + backend::{ + get, + operation::{ + aggregation_update::InnerOfUppersHasNewFollowersJob, get_uppers, is_aggregating_node, + is_root_node, AggregationUpdateJob, AggregationUpdateQueue, TaskGuard, + }, + }, + data::CachedDataItem, +}; + +const AGGREGATION_NUMBER_BUFFER_SPACE: u32 = 3; + +pub fn connect_children( + parent_task_id: TaskId, + parent_task: &mut impl TaskGuard, + new_children: FxHashSet, + queue: &mut AggregationUpdateQueue, + has_active_count: bool, +) { + if new_children.is_empty() { + return; + } + let children_count = new_children.len(); + + // Compute future parent aggregation number based on the number of children + let current_parent_aggregation = get!(parent_task, AggregationNumber) + .copied() + .unwrap_or_default(); + let (parent_aggregation, future_parent_aggregation) = + if is_root_node(current_parent_aggregation.base) { + (u32::MAX, u32::MAX) + } else { + let target_distance = children_count.ilog2() * 2; + if target_distance > current_parent_aggregation.distance { + queue.push(AggregationUpdateJob::UpdateAggregationNumber { + task_id: parent_task_id, + base_aggregation_number: 0, + distance: NonZeroU32::new(target_distance), + }) + } + ( + current_parent_aggregation.effective, + current_parent_aggregation + .base + .saturating_add(max(target_distance, current_parent_aggregation.distance)), + ) + }; + + // When the parent is a leaf node, we need to increase the aggregation number of the children to + // be counting from the parent's aggregation number. + if !is_aggregating_node(future_parent_aggregation) { + let child_base_aggregation_number = + future_parent_aggregation + AGGREGATION_NUMBER_BUFFER_SPACE; + for &new_child in new_children.iter() { + queue.push(AggregationUpdateJob::UpdateAggregationNumber { + task_id: new_child, + base_aggregation_number: child_base_aggregation_number, + distance: None, + }); + } + }; + + for &new_child in new_children.iter() { + parent_task.add_new(CachedDataItem::Child { + task: new_child, + value: (), + }); + } + + let new_follower_ids: SmallVec<_> = new_children.iter().copied().collect(); + + let aggregating_node = is_aggregating_node(parent_aggregation); + let upper_ids = (!aggregating_node).then(|| get_uppers(&*parent_task)); + + if let Some(upper_ids) = upper_ids { + // Parent is a leaf node, the children are followers of it now. + if !upper_ids.is_empty() { + queue.push( + InnerOfUppersHasNewFollowersJob { + upper_ids, + new_follower_ids: new_follower_ids.clone(), + } + .into(), + ); + } + // We need to decrease the active count because we temporarily increased it during + // connect_child. We need to increase the active count when the parent has active + // count, because it's added as follower. + if !has_active_count { + queue.push(AggregationUpdateJob::DecreaseActiveCounts { + task_ids: new_follower_ids, + }) + } + } else { + // Parent is an aggregating node. We run the normal code to connect the children. + queue.push(AggregationUpdateJob::InnerOfUpperHasNewFollowers { + upper_id: parent_task_id, + new_follower_ids: new_follower_ids.clone(), + }); + // We need to decrease the active count because we temporarily increased it during + // connect_child. + queue.push(AggregationUpdateJob::DecreaseActiveCounts { + task_ids: new_follower_ids, + }) + } +} diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs index f34eeecfd9859..451e3f7a14157 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs @@ -13,7 +13,10 @@ use crate::{ storage::{get, get_mut}, TaskDataCategory, }, - data::{CachedDataItem, CachedDataItemKey, CachedDataItemValue, DirtyState, InProgressState}, + data::{ + CachedDataItem, CachedDataItemKey, CachedDataItemValue, DirtyState, InProgressState, + InProgressStateInner, + }, }; #[derive(Serialize, Deserialize, Clone, Default)] @@ -196,7 +199,9 @@ pub fn make_task_dirty_internal( ctx: &impl ExecuteContext, ) { if make_stale { - if let Some(InProgressState::InProgress { stale, .. }) = get_mut!(task, InProgress) { + if let Some(InProgressState::InProgress(box InProgressStateInner { stale, .. })) = + get_mut!(task, InProgress) + { if !*stale { #[cfg(feature = "trace_task_dirty")] let _span = tracing::trace_span!( diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 2e73e53826eb8..03e34732cdfa3 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -1,6 +1,7 @@ mod aggregation_update; mod cleanup_old_edges; mod connect_child; +mod connect_children; mod invalidate; mod update_cell; mod update_collectible; @@ -746,9 +747,11 @@ impl_operation!(AggregationUpdate aggregation_update::AggregationUpdateQueue); pub use self::invalidate::TaskDirtyCause; pub use self::{ aggregation_update::{ - get_aggregation_number, is_root_node, AggregatedDataUpdate, AggregationUpdateJob, + get_aggregation_number, get_uppers, is_aggregating_node, is_root_node, + AggregatedDataUpdate, AggregationUpdateJob, }, cleanup_old_edges::OutdatedEdge, + connect_children::connect_children, update_cell::UpdateCellOperation, update_collectible::UpdateCollectibleOperation, }; diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_output.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_output.rs index 55fa85e25cb6d..3fe9d8fd76d20 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_output.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_output.rs @@ -15,7 +15,10 @@ use crate::{ storage::{get, get_many}, TaskDataCategory, }, - data::{CachedDataItem, CachedDataItemKey, CellRef, InProgressState, OutputValue}, + data::{ + CachedDataItem, CachedDataItemKey, CellRef, InProgressState, InProgressStateInner, + OutputValue, + }, }; #[derive(Serialize, Deserialize, Clone, Default)] @@ -45,10 +48,23 @@ impl UpdateOutputOperation { mut ctx: impl ExecuteContext, ) { let mut task = ctx.task(task_id, TaskDataCategory::Meta); - if let Some(InProgressState::InProgress { stale: true, .. }) = get!(task, InProgress) { + let Some(InProgressState::InProgress(box InProgressStateInner { + stale, + new_children, + .. + })) = get!(task, InProgress) + else { + panic!("Task is not in progress while updating the output"); + }; + if *stale { // Skip updating the output when the task is stale return; } + let children = ctx + .should_track_children() + .then(|| new_children.iter().copied().collect()) + .unwrap_or_default(); + let old_error = task.remove(&CachedDataItemKey::Error {}); let current_output = get!(task, Output); let output_value = match output { @@ -109,10 +125,6 @@ impl UpdateOutputOperation { .should_track_dependencies() .then(|| get_many!(task, OutputDependent { task } => task)) .unwrap_or_default(); - let children = ctx - .should_track_children() - .then(|| get_many!(task, Child { task } => task)) - .unwrap_or_default(); let mut queue = AggregationUpdateQueue::new(); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index 921cd612bde6b..f0b937139ba6f 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -588,7 +588,7 @@ macro_rules! get_mut { use $crate::backend::operation::TaskGuard; if let Some($crate::data::CachedDataItemValueRefMut::$key { value, - }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input).as_mut() { + }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input) { let () = $crate::data::allow_mut_access::$key; Some(value) } else { diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 3c02fcfe1cf47..b3effc4b17226 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -1,5 +1,6 @@ use std::cmp::Ordering; +use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use turbo_tasks::{ event::{Event, EventListener}, @@ -302,19 +303,23 @@ pub enum RootType { OnceTask, } +#[derive(Debug)] +pub struct InProgressStateInner { + pub stale: bool, + #[allow(dead_code)] + pub once_task: bool, + pub session_dependent: bool, + pub marked_as_completed: bool, + pub done_event: Event, + /// Children that should be connected to the task and have their active_count decremented + /// once the task completes. + pub new_children: FxHashSet, +} + #[derive(Debug)] pub enum InProgressState { - Scheduled { - done_event: Event, - }, - InProgress { - stale: bool, - #[allow(dead_code)] - once_task: bool, - session_dependent: bool, - marked_as_completed: bool, - done_event: Event, - }, + Scheduled { done_event: Event }, + InProgress(Box), } transient_traits!(InProgressState); @@ -471,11 +476,6 @@ pub enum CachedDataItem { target: CollectiblesRef, value: (), }, - #[serde(skip)] - OutdatedChild { - task: TaskId, - value: (), - }, // Transient Error State #[serde(skip)] @@ -516,7 +516,6 @@ impl CachedDataItem { CachedDataItem::OutdatedOutputDependency { .. } => false, CachedDataItem::OutdatedCellDependency { .. } => false, CachedDataItem::OutdatedCollectiblesDependency { .. } => false, - CachedDataItem::OutdatedChild { .. } => false, CachedDataItem::Error { .. } => false, } } @@ -569,7 +568,6 @@ impl CachedDataItem { | Self::OutdatedOutputDependency { .. } | Self::OutdatedCellDependency { .. } | Self::OutdatedCollectiblesDependency { .. } - | Self::OutdatedChild { .. } | Self::InProgressCell { .. } | Self::InProgress { .. } | Self::Error { .. } @@ -610,7 +608,6 @@ impl CachedDataItemKey { CachedDataItemKey::OutdatedOutputDependency { .. } => false, CachedDataItemKey::OutdatedCellDependency { .. } => false, CachedDataItemKey::OutdatedCollectiblesDependency { .. } => false, - CachedDataItemKey::OutdatedChild { .. } => false, CachedDataItemKey::Error { .. } => false, } } @@ -651,7 +648,6 @@ impl CachedDataItemType { | Self::OutdatedOutputDependency { .. } | Self::OutdatedCellDependency { .. } | Self::OutdatedCollectiblesDependency { .. } - | Self::OutdatedChild { .. } | Self::InProgressCell { .. } | Self::InProgress { .. } | Self::Error { .. } diff --git a/turbopack/crates/turbo-tasks-testing/tests/basic.rs b/turbopack/crates/turbo-tasks-testing/tests/basic.rs index e1ea72133a2e0..5af06034d8ce6 100644 --- a/turbopack/crates/turbo-tasks-testing/tests/basic.rs +++ b/turbopack/crates/turbo-tasks-testing/tests/basic.rs @@ -21,6 +21,12 @@ async fn basic() { let output3 = func_persistent(output1); assert_eq!(output3.await?.value, 123); + let output4 = nested_func_without_args_waiting(); + assert_eq!(output4.await?.value, 123); + + let output5 = nested_func_without_args_non_waiting(); + assert_eq!(output5.await?.value, 123); + anyhow::Ok(()) }) .await @@ -28,6 +34,7 @@ async fn basic() { } #[turbo_tasks::value] +#[derive(Clone, Debug)] struct Value { value: u32, } @@ -52,3 +59,16 @@ async fn func_without_args() -> Result> { let value = 123; Ok(Value { value }.cell()) } + +#[turbo_tasks::function] +async fn nested_func_without_args_waiting() -> Result> { + println!("nested_func_without_args_waiting"); + let value = func_without_args().await?.clone_value(); + Ok(value.cell()) +} + +#[turbo_tasks::function] +async fn nested_func_without_args_non_waiting() -> Result> { + println!("nested_func_without_args_non_waiting"); + Ok(func_without_args()) +} diff --git a/turbopack/crates/turbo-tasks-testing/tests/emptied_cells.rs b/turbopack/crates/turbo-tasks-testing/tests/emptied_cells.rs index 4b7b33da76857..24bc65a016d72 100644 --- a/turbopack/crates/turbo-tasks-testing/tests/emptied_cells.rs +++ b/turbopack/crates/turbo-tasks-testing/tests/emptied_cells.rs @@ -51,12 +51,14 @@ struct ChangingInput { #[turbo_tasks::function] async fn compute(input: Vc) -> Result> { + println!("compute()"); let value = *inner_compute(input).await?; Ok(Vc::cell(value)) } #[turbo_tasks::function] async fn inner_compute(input: Vc) -> Result> { + println!("inner_compute()"); let state_value = *input.await?.state.get(); let mut last = None; for i in 0..=state_value { @@ -67,6 +69,7 @@ async fn inner_compute(input: Vc) -> Result> { #[turbo_tasks::function] async fn compute2(input: Vc) -> Result> { + println!("compute2()"); let value = *input.await?; Ok(Vc::cell(value)) } diff --git a/turbopack/crates/turbo-tasks/src/event.rs b/turbopack/crates/turbo-tasks/src/event.rs index 6bc584ad23a6c..fac45de1fa149 100644 --- a/turbopack/crates/turbo-tasks/src/event.rs +++ b/turbopack/crates/turbo-tasks/src/event.rs @@ -154,7 +154,7 @@ pub struct EventListener { note: Arc String + Sync + Send>, // Timeout need to stay pinned while polling and also while it's dropped. // So it's important to put it into a pinned Box to be able to take it out of the Option. - future: Option>>>, + future: Option>>>, duration: Duration, } @@ -200,7 +200,7 @@ impl Future for EventListener { self.duration, // SAFETY: We can move the inner future since it's an EventListener and // that is Unpin. - unsafe { Pin::into_inner_unchecked(future) }.into_inner(), + unsafe { std::pin::Pin::into_inner_unchecked(future) }.into_inner(), ))); } }