diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index ed2ab3804714f..cb8d68eed3010 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -39,9 +39,10 @@ pub use self::{operation::AnyOperation, storage::TaskDataCategory}; use crate::{ backend::{ operation::{ - get_aggregation_number, is_root_node, AggregatedDataUpdate, AggregationUpdateJob, - AggregationUpdateQueue, CleanupOldEdgesOperation, ConnectChildOperation, - ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, TaskDirtyCause, TaskGuard, + connect_children, get_aggregation_number, is_root_node, AggregatedDataUpdate, + AggregationUpdateJob, AggregationUpdateQueue, CleanupOldEdgesOperation, + ConnectChildOperation, ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, + TaskDirtyCause, TaskGuard, }, persisted_storage_log::PersistedStorageLog, storage::{get, get_many, get_mut, get_mut_or_insert_with, iter_many, remove, Storage}, @@ -50,7 +51,8 @@ use crate::{ data::{ ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType, CachedDataItemValue, CachedDataItemValueRef, CachedDataUpdate, CellRef, CollectibleRef, - CollectiblesRef, DirtyState, InProgressCellState, InProgressState, OutputValue, RootType, + CollectiblesRef, DirtyState, InProgressCellState, InProgressState, InProgressStateInner, + OutputValue, RootType, }, utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded}, }; @@ -395,21 +397,31 @@ impl TurboTasksBackendInner { let mut ctx = self.execute_context(turbo_tasks); let mut task = ctx.task(task_id, TaskDataCategory::All); - if let Some(in_progress) = get!(task, InProgress) { - match in_progress { - InProgressState::Scheduled { done_event, .. } - | InProgressState::InProgress { done_event, .. } => { - let reader_desc = reader.map(|r| self.get_task_desc_fn(r)); - let listener = done_event.listen_with_note(move || { - if let Some(reader_desc) = reader_desc.as_ref() { - format!("try_read_task_output from {}", reader_desc()) - } else { - "try_read_task_output (untracked)".to_string() - } - }); - return Ok(Err(listener)); + fn check_in_progress( + this: &TurboTasksBackendInner, + task: &impl TaskGuard, + reader: Option, + ) -> Option, anyhow::Error>> + { + if let Some(in_progress) = get!(task, InProgress) { + match in_progress { + InProgressState::Scheduled { done_event, .. } + | InProgressState::InProgress(box InProgressStateInner { + done_event, .. + }) => { + let reader_desc = reader.map(|r| this.get_task_desc_fn(r)); + let listener = done_event.listen_with_note(move || { + if let Some(reader_desc) = reader_desc.as_ref() { + format!("try_read_task_output from {}", reader_desc()) + } else { + "try_read_task_output (untracked)".to_string() + } + }); + return Some(Ok(Err(listener))); + } } } + None } if self.should_track_children() && matches!(consistency, ReadConsistency::Strong) { @@ -489,6 +501,10 @@ impl TurboTasksBackendInner { } } + if let Some(value) = check_in_progress(self, &task, reader) { + return value; + } + if let Some(output) = get!(task, Output) { let result = match output { OutputValue::Cell(cell) => Some(Ok(Ok(RawVc::TaskCell(cell.task, cell.cell)))), @@ -992,40 +1008,17 @@ impl TurboTasksBackendInner { return None; }; task.add_new(CachedDataItem::InProgress { - value: InProgressState::InProgress { + value: InProgressState::InProgress(Box::new(InProgressStateInner { stale: false, once_task, done_event, session_dependent: false, marked_as_completed: false, - }, + new_children: Default::default(), + })), }); if self.should_track_children() { - // Make all current children outdated (remove left-over outdated children) - enum Child { - Current(TaskId), - Outdated(TaskId), - } - let children = iter_many!(task, Child { task } => Child::Current(task)) - .chain(iter_many!(task, OutdatedChild { task } => Child::Outdated(task))) - .collect::>(); - for child in children { - match child { - Child::Current(child) => { - let _ = task.add(CachedDataItem::OutdatedChild { - task: child, - value: (), - }); - } - Child::Outdated(child) => { - if !task.has_key(&CachedDataItemKey::Child { task: child }) { - task.remove(&CachedDataItemKey::OutdatedChild { task: child }); - } - } - } - } - // Make all current collectibles outdated (remove left-over outdated collectibles) enum Collectible { Current(CollectibleRef, i32), @@ -1192,24 +1185,43 @@ impl TurboTasksBackendInner { ) -> bool { let mut ctx = self.execute_context(turbo_tasks); let mut task = ctx.task(task_id, TaskDataCategory::All); - let Some(in_progress) = get!(task, InProgress) else { + let Some(in_progress) = get_mut!(task, InProgress) else { panic!("Task execution completed, but task is not in progress: {task:#?}"); }; - let &InProgressState::InProgress { stale, .. } = in_progress else { + let &mut InProgressState::InProgress(box InProgressStateInner { + stale, + ref mut new_children, + .. + }) = in_progress + else { panic!("Task execution completed, but task is not in progress: {task:#?}"); }; // If the task is stale, reschedule it if stale { - let Some(InProgressState::InProgress { done_event, .. }) = remove!(task, InProgress) + let Some(InProgressState::InProgress(box InProgressStateInner { + done_event, + new_children, + .. + })) = remove!(task, InProgress) else { unreachable!(); }; task.add_new(CachedDataItem::InProgress { value: InProgressState::Scheduled { done_event }, }); + + // All `new_children` are currently hold active with an active count and we need to undo + // that. + AggregationUpdateQueue::run( + AggregationUpdateJob::DecreaseActiveCounts { + task_ids: new_children.into_iter().collect(), + }, + &mut ctx, + ); return true; } + let mut new_children = take(new_children); // TODO handle stateful let _ = stateful; @@ -1236,9 +1248,24 @@ impl TurboTasksBackendInner { task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type }); } - let mut removed_data: Vec = Vec::new(); + let mut queue = AggregationUpdateQueue::new(); + + let mut removed_data = Vec::new(); let mut old_edges = Vec::new(); + // Connect children + { + for old_child in iter_many!(task, Child { task } => task) { + if !new_children.remove(&old_child) { + old_edges.push(OutdatedEdge::Child(old_child)); + } + } + + let active_count = + get!(task, Activeness).map_or(0, |activeness| activeness.active_counter); + connect_children(task_id, &mut task, new_children, &mut queue, active_count); + } + // Remove no longer existing cells and notify in progress cells // find all outdated data items (removed cells, outdated edges) removed_data.extend( @@ -1263,12 +1290,6 @@ impl TurboTasksBackendInner { .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index)) })); if self.should_track_children() { - old_edges.extend(task.iter(CachedDataItemType::OutdatedChild).filter_map( - |(key, _)| match key { - CachedDataItemKey::OutdatedChild { task } => Some(OutdatedEdge::Child(task)), - _ => None, - }, - )); old_edges.extend( task.iter(CachedDataItemType::OutdatedCollectible) .filter_map(|(key, value)| match (key, value) { @@ -1301,10 +1322,13 @@ impl TurboTasksBackendInner { drop(task); - // Remove outdated edges first, before removing in_progress+dirty flag. - // We need to make sure all outdated edges are removed before the task can potentially be - // scheduled and executed again - CleanupOldEdgesOperation::run(task_id, old_edges, &mut ctx); + { + let _span = tracing::trace_span!("CleanupOldEdgesOperation").entered(); + // Remove outdated edges first, before removing in_progress+dirty flag. + // We need to make sure all outdated edges are removed before the task can potentially + // be scheduled and executed again + CleanupOldEdgesOperation::run(task_id, old_edges, queue, &mut ctx); + } // When restoring from persistent caching the following might not be executed (since we can // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and @@ -1314,16 +1338,18 @@ impl TurboTasksBackendInner { let Some(in_progress) = remove!(task, InProgress) else { panic!("Task execution completed, but task is not in progress: {task:#?}"); }; - let InProgressState::InProgress { + let InProgressState::InProgress(box InProgressStateInner { done_event, once_task: _, stale, session_dependent, marked_as_completed: _, - } = in_progress + new_children, + }) = in_progress else { panic!("Task execution completed, but task is not in progress: {task:#?}"); }; + debug_assert!(new_children.is_empty()); // If the task is stale, reschedule it if stale { @@ -1663,9 +1689,10 @@ impl TurboTasksBackendInner { ) { let mut ctx = self.execute_context(turbo_tasks); let mut task = ctx.task(task, TaskDataCategory::Data); - if let Some(InProgressState::InProgress { - session_dependent, .. - }) = get_mut!(task, InProgress) + if let Some(InProgressState::InProgress(box InProgressStateInner { + session_dependent, + .. + })) = get_mut!(task, InProgress) { *session_dependent = true; } @@ -1678,10 +1705,10 @@ impl TurboTasksBackendInner { ) { let mut ctx = self.execute_context(turbo_tasks); let mut task = ctx.task(task, TaskDataCategory::Data); - if let Some(InProgressState::InProgress { + if let Some(InProgressState::InProgress(box InProgressStateInner { marked_as_completed, .. - }) = get_mut!(task, InProgress) + })) = get_mut!(task, InProgress) { *marked_as_completed = true; // TODO this should remove the dirty state (also check session_dependent) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs index 7b9113451d8b2..d537d1babaae2 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs @@ -46,8 +46,12 @@ pub enum OutdatedEdge { } impl CleanupOldEdgesOperation { - pub fn run(task_id: TaskId, outdated: Vec, ctx: &mut impl ExecuteContext) { - let queue = AggregationUpdateQueue::new(); + pub fn run( + task_id: TaskId, + outdated: Vec, + queue: AggregationUpdateQueue, + ctx: &mut impl ExecuteContext, + ) { CleanupOldEdgesOperation::RemoveEdges { task_id, outdated, diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs index d7338d6cf54f9..4dfc70df715a1 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs @@ -1,25 +1,18 @@ -use std::{cmp::max, num::NonZeroU32}; - use serde::{Deserialize, Serialize}; use turbo_tasks::TaskId; use crate::{ backend::{ + get_mut, operation::{ - aggregation_update::{ - get_uppers, is_aggregating_node, AggregationUpdateJob, AggregationUpdateQueue, - LEAF_NUMBER, - }, - is_root_node, ExecuteContext, Operation, TaskGuard, + aggregation_update::{AggregationUpdateJob, AggregationUpdateQueue}, + ExecuteContext, Operation, TaskGuard, }, - storage::{count, get}, TaskDataCategory, }, - data::{CachedDataItem, CachedDataItemKey}, + data::{CachedDataItem, CachedDataItemKey, InProgressState, InProgressStateInner}, }; -const AGGREGATION_NUMBER_BUFFER_SPACE: u32 = 3; - #[derive(Serialize, Deserialize, Clone, Default)] #[allow(clippy::large_enum_variant)] pub enum ConnectChildOperation { @@ -44,112 +37,38 @@ impl ConnectChildOperation { } return; } - let mut parent_task = ctx.task(parent_task_id, TaskDataCategory::All); + let mut parent_task = ctx.task(parent_task_id, TaskDataCategory::Meta); + let Some(InProgressState::InProgress(box InProgressStateInner { new_children, .. })) = + get_mut!(parent_task, InProgress) + else { + panic!("Task is not in progress while calling another task"); + }; + // Quick skip if the child was already connected before - if parent_task - .remove(&CachedDataItemKey::OutdatedChild { - task: child_task_id, - }) - .is_some() - { + if !new_children.insert(child_task_id) { return; } - if parent_task.add(CachedDataItem::Child { - task: child_task_id, - value: (), - }) { - let mut queue = AggregationUpdateQueue::new(); + drop(parent_task); - if get!(parent_task, Activeness).is_some_and(|a| a.active_counter > 0) { - queue.push(AggregationUpdateJob::IncreaseActiveCount { - task: child_task_id, - }) - } - - // Get the children count - let children_count = count!(parent_task, Child); - - // Compute future parent aggregation number based on the number of children - let current_parent_aggregation = get!(parent_task, AggregationNumber) - .copied() - .unwrap_or_default(); - let (parent_aggregation, future_parent_aggregation) = - if is_root_node(current_parent_aggregation.base) { - (u32::MAX, u32::MAX) - } else { - let target_distance = children_count.ilog2() * 2; - if target_distance > current_parent_aggregation.distance { - queue.push(AggregationUpdateJob::UpdateAggregationNumber { - task_id: parent_task_id, - base_aggregation_number: 0, - distance: NonZeroU32::new(target_distance), - }) - } - ( - current_parent_aggregation.effective, - current_parent_aggregation.base.saturating_add(max( - target_distance, - current_parent_aggregation.distance, - )), - ) - }; + let mut queue = AggregationUpdateQueue::new(); - // Update child aggregation number based on parent aggregation number - let aggregating_node = is_aggregating_node(parent_aggregation); - if parent_task_id.is_transient() && !child_task_id.is_transient() { - queue.push(AggregationUpdateJob::UpdateAggregationNumber { - task_id: child_task_id, - base_aggregation_number: u32::MAX, - distance: None, - }); - } else if !aggregating_node { - let base_aggregation_number = - future_parent_aggregation.saturating_add(AGGREGATION_NUMBER_BUFFER_SPACE); - queue.push(AggregationUpdateJob::UpdateAggregationNumber { - task_id: child_task_id, - base_aggregation_number: if is_aggregating_node( - base_aggregation_number.saturating_add(AGGREGATION_NUMBER_BUFFER_SPACE - 1), - ) { - LEAF_NUMBER - } else { - base_aggregation_number - }, - distance: None, - }); - } - if aggregating_node { - queue.push(AggregationUpdateJob::InnerOfUpperHasNewFollower { - upper_id: parent_task_id, - new_follower_id: child_task_id, - }); - } else { - let upper_ids = get_uppers(&parent_task); - queue.push(AggregationUpdateJob::InnerOfUppersHasNewFollower { - upper_ids, - new_follower_id: child_task_id, - }); - } - drop(parent_task); + // Handle the transient to persistent boundary by making the persistent task a root task + if parent_task_id.is_transient() && !child_task_id.is_transient() { + queue.push(AggregationUpdateJob::UpdateAggregationNumber { + task_id: child_task_id, + base_aggregation_number: u32::MAX, + distance: None, + }); + } - { - let mut task = ctx.task(child_task_id, TaskDataCategory::Meta); - if !task.has_key(&CachedDataItemKey::Output {}) { - let description = ctx.get_task_desc_fn(child_task_id); - let should_schedule = task.add(CachedDataItem::new_scheduled(description)); - drop(task); - if should_schedule { - ctx.schedule(child_task_id); - } - } - } + queue.push(AggregationUpdateJob::IncreaseActiveCount { + task: child_task_id, + }); - #[cfg(feature = "trace_aggregation_update")] - let _span = tracing::trace_span!("connect_child").entered(); - ConnectChildOperation::UpdateAggregation { - aggregation_update: queue, - } - .execute(&mut ctx); + ConnectChildOperation::UpdateAggregation { + aggregation_update: queue, } + .execute(&mut ctx); } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_children.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_children.rs new file mode 100644 index 0000000000000..60aa9f43f80f7 --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_children.rs @@ -0,0 +1,100 @@ +use std::{cmp::max, num::NonZeroU32}; + +use rustc_hash::FxHashSet; +use turbo_tasks::TaskId; + +use crate::{ + backend::{ + get, + operation::{ + get_uppers, is_aggregating_node, is_root_node, AggregationUpdateJob, + AggregationUpdateQueue, TaskGuard, + }, + }, + data::CachedDataItem, +}; + +const AGGREGATION_NUMBER_BUFFER_SPACE: u32 = 3; + +pub fn connect_children( + parent_task_id: TaskId, + parent_task: &mut impl TaskGuard, + new_children: FxHashSet, + queue: &mut AggregationUpdateQueue, + active_count: i32, +) { + if new_children.is_empty() { + return; + } + let children_count = new_children.len(); + + // Compute future parent aggregation number based on the number of children + let current_parent_aggregation = get!(parent_task, AggregationNumber) + .copied() + .unwrap_or_default(); + let (parent_aggregation, future_parent_aggregation) = + if is_root_node(current_parent_aggregation.base) { + (u32::MAX, u32::MAX) + } else { + let target_distance = children_count.ilog2() * 2; + if target_distance > current_parent_aggregation.distance { + queue.push(AggregationUpdateJob::UpdateAggregationNumber { + task_id: parent_task_id, + base_aggregation_number: 0, + distance: NonZeroU32::new(target_distance), + }) + } + ( + current_parent_aggregation.effective, + current_parent_aggregation + .base + .saturating_add(max(target_distance, current_parent_aggregation.distance)), + ) + }; + + // When the parent is a leaf node, we need to increase the aggregation number of the children to + // be counting from the parent's aggregation number. + if !is_aggregating_node(future_parent_aggregation) { + let child_base_aggregation_number = + future_parent_aggregation + AGGREGATION_NUMBER_BUFFER_SPACE; + for &new_child in new_children.iter() { + queue.push(AggregationUpdateJob::UpdateAggregationNumber { + task_id: new_child, + base_aggregation_number: child_base_aggregation_number, + distance: None, + }); + } + }; + + for &new_child in new_children.iter() { + parent_task.add_new(CachedDataItem::Child { + task: new_child, + value: (), + }); + } + + let new_follower_ids: Vec<_> = new_children.iter().copied().collect(); + + let aggregating_node = is_aggregating_node(parent_aggregation); + let upper_ids = (!aggregating_node).then(|| get_uppers(&*parent_task)); + + if let Some(upper_ids) = upper_ids { + if !upper_ids.is_empty() { + queue.push(AggregationUpdateJob::InnerOfUppersHasNewFollowers { + upper_ids, + new_follower_ids: new_follower_ids.clone(), + }); + } + } else { + queue.push(AggregationUpdateJob::InnerOfUpperHasNewFollowers { + upper_id: parent_task_id, + new_follower_ids: new_follower_ids.clone(), + }); + } + + if active_count == 0 { + queue.push(AggregationUpdateJob::DecreaseActiveCounts { + task_ids: new_follower_ids, + }) + } +} diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs index 1693d0c81b315..fb836b39f04ac 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs @@ -15,7 +15,10 @@ use crate::{ storage::{get, get_mut}, TaskDataCategory, }, - data::{CachedDataItem, CachedDataItemKey, CachedDataItemValue, DirtyState, InProgressState}, + data::{ + CachedDataItem, CachedDataItemKey, CachedDataItemValue, DirtyState, InProgressState, + InProgressStateInner, + }, }; #[derive(Serialize, Deserialize, Clone, Default)] @@ -162,7 +165,9 @@ pub fn make_task_dirty_internal( ctx: &impl ExecuteContext, ) { if make_stale { - if let Some(InProgressState::InProgress { stale, .. }) = get_mut!(task, InProgress) { + if let Some(InProgressState::InProgress(box InProgressStateInner { stale, .. })) = + get_mut!(task, InProgress) + { if !*stale { let _span = tracing::trace_span!( "make task stale", diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index e8752ec7b0630..78fe2699944cc 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -1,6 +1,7 @@ mod aggregation_update; mod cleanup_old_edges; mod connect_child; +mod connect_children; mod invalidate; mod update_cell; mod update_collectible; @@ -744,9 +745,11 @@ impl_operation!(AggregationUpdate aggregation_update::AggregationUpdateQueue); pub use self::{ aggregation_update::{ - get_aggregation_number, is_root_node, AggregatedDataUpdate, AggregationUpdateJob, + get_aggregation_number, get_uppers, is_aggregating_node, is_root_node, + AggregatedDataUpdate, AggregationUpdateJob, }, cleanup_old_edges::OutdatedEdge, + connect_children::connect_children, invalidate::TaskDirtyCause, update_cell::UpdateCellOperation, update_collectible::UpdateCollectibleOperation, diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_output.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_output.rs index 79637c5862a0b..96d3d58ab734f 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_output.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_output.rs @@ -13,7 +13,10 @@ use crate::{ storage::{get, get_many}, TaskDataCategory, }, - data::{CachedDataItem, CachedDataItemKey, CellRef, InProgressState, OutputValue}, + data::{ + CachedDataItem, CachedDataItemKey, CellRef, InProgressState, InProgressStateInner, + OutputValue, + }, }; #[derive(Serialize, Deserialize, Clone, Default)] @@ -42,10 +45,23 @@ impl UpdateOutputOperation { mut ctx: impl ExecuteContext, ) { let mut task = ctx.task(task_id, TaskDataCategory::Meta); - if let Some(InProgressState::InProgress { stale: true, .. }) = get!(task, InProgress) { + let Some(InProgressState::InProgress(box InProgressStateInner { + stale, + new_children, + .. + })) = get!(task, InProgress) + else { + panic!("Task is not in progress while updating the output"); + }; + if *stale { // Skip updating the output when the task is stale return; } + let children = ctx + .should_track_children() + .then(|| new_children.iter().copied().collect()) + .unwrap_or_default(); + let old_error = task.remove(&CachedDataItemKey::Error {}); let current_output = get!(task, Output); let output_value = match output { @@ -106,10 +122,6 @@ impl UpdateOutputOperation { .should_track_dependencies() .then(|| get_many!(task, OutputDependent { task } => task)) .unwrap_or_default(); - let children = ctx - .should_track_children() - .then(|| get_many!(task, Child { task } => task)) - .unwrap_or_default(); let mut queue = AggregationUpdateQueue::new(); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index ec3b206927f1d..2b7cbb3a14ef2 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -363,7 +363,7 @@ macro_rules! get_mut { use $crate::backend::operation::TaskGuard; if let Some($crate::data::CachedDataItemValueRefMut::$key { value, - }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input).as_mut() { + }) = $task.get_mut(&$crate::data::CachedDataItemKey::$key $input) { let () = $crate::data::allow_mut_access::$key; Some(value) } else { diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index ebed696d5e1ea..2adf6fdfd2c5d 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -1,5 +1,6 @@ use std::cmp::Ordering; +use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use turbo_tasks::{ event::{Event, EventListener}, @@ -299,19 +300,23 @@ pub enum RootType { OnceTask, } +#[derive(Debug)] +pub struct InProgressStateInner { + pub stale: bool, + #[allow(dead_code)] + pub once_task: bool, + pub session_dependent: bool, + pub marked_as_completed: bool, + pub done_event: Event, + /// Children that should be connected to the task and have their active_count decremented + /// once the task completes. + pub new_children: FxHashSet, +} + #[derive(Debug)] pub enum InProgressState { - Scheduled { - done_event: Event, - }, - InProgress { - stale: bool, - #[allow(dead_code)] - once_task: bool, - session_dependent: bool, - marked_as_completed: bool, - done_event: Event, - }, + Scheduled { done_event: Event }, + InProgress(Box), } transient_traits!(InProgressState); @@ -472,11 +477,6 @@ pub enum CachedDataItem { target: CollectiblesRef, value: (), }, - #[serde(skip)] - OutdatedChild { - task: TaskId, - value: (), - }, // Transient Error State #[serde(skip)] @@ -518,7 +518,6 @@ impl CachedDataItem { CachedDataItem::OutdatedOutputDependency { .. } => false, CachedDataItem::OutdatedCellDependency { .. } => false, CachedDataItem::OutdatedCollectiblesDependency { .. } => false, - CachedDataItem::OutdatedChild { .. } => false, CachedDataItem::Error { .. } => false, } } @@ -572,7 +571,6 @@ impl CachedDataItem { | Self::OutdatedOutputDependency { .. } | Self::OutdatedCellDependency { .. } | Self::OutdatedCollectiblesDependency { .. } - | Self::OutdatedChild { .. } | Self::InProgressCell { .. } | Self::InProgress { .. } | Self::Error { .. } @@ -614,7 +612,6 @@ impl CachedDataItemKey { CachedDataItemKey::OutdatedOutputDependency { .. } => false, CachedDataItemKey::OutdatedCellDependency { .. } => false, CachedDataItemKey::OutdatedCollectiblesDependency { .. } => false, - CachedDataItemKey::OutdatedChild { .. } => false, CachedDataItemKey::Error { .. } => false, } } @@ -656,7 +653,6 @@ impl CachedDataItemType { | Self::OutdatedOutputDependency { .. } | Self::OutdatedCellDependency { .. } | Self::OutdatedCollectiblesDependency { .. } - | Self::OutdatedChild { .. } | Self::InProgressCell { .. } | Self::InProgress { .. } | Self::Error { .. } diff --git a/turbopack/crates/turbo-tasks-backend/src/lib.rs b/turbopack/crates/turbo-tasks-backend/src/lib.rs index 1d3fef0f365ae..d41dce80a55a1 100644 --- a/turbopack/crates/turbo-tasks-backend/src/lib.rs +++ b/turbopack/crates/turbo-tasks-backend/src/lib.rs @@ -1,6 +1,7 @@ #![feature(anonymous_lifetime_in_impl_trait)] #![feature(associated_type_defaults)] #![feature(iter_collect_into)] +#![feature(box_patterns)] mod backend; mod backing_storage;