Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] connect children in a batch when task is about to finish #74773

Draft
wants to merge 8 commits into
base: canary
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 91 additions & 64 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ pub use self::{operation::AnyOperation, storage::TaskDataCategory};
use crate::{
backend::{
operation::{
get_aggregation_number, is_root_node, AggregatedDataUpdate, AggregationUpdateJob,
AggregationUpdateQueue, CleanupOldEdgesOperation, ConnectChildOperation,
ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, TaskDirtyCause, TaskGuard,
connect_children, get_aggregation_number, is_root_node, AggregatedDataUpdate,
AggregationUpdateJob, AggregationUpdateQueue, CleanupOldEdgesOperation,
ConnectChildOperation, ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge,
TaskDirtyCause, TaskGuard,
},
persisted_storage_log::PersistedStorageLog,
storage::{get, get_many, get_mut, get_mut_or_insert_with, iter_many, remove, Storage},
Expand All @@ -50,7 +51,8 @@ use crate::{
data::{
ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
CachedDataItemValue, CachedDataItemValueRef, CachedDataUpdate, CellRef, CollectibleRef,
CollectiblesRef, DirtyState, InProgressCellState, InProgressState, OutputValue, RootType,
CollectiblesRef, DirtyState, InProgressCellState, InProgressState, InProgressStateInner,
OutputValue, RootType,
},
utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded},
};
Expand Down Expand Up @@ -395,21 +397,31 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let mut ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id, TaskDataCategory::All);

if let Some(in_progress) = get!(task, InProgress) {
match in_progress {
InProgressState::Scheduled { done_event, .. }
| InProgressState::InProgress { done_event, .. } => {
let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
let listener = done_event.listen_with_note(move || {
if let Some(reader_desc) = reader_desc.as_ref() {
format!("try_read_task_output from {}", reader_desc())
} else {
"try_read_task_output (untracked)".to_string()
}
});
return Ok(Err(listener));
fn check_in_progress<B: BackingStorage>(
this: &TurboTasksBackendInner<B>,
task: &impl TaskGuard,
reader: Option<TaskId>,
) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
{
if let Some(in_progress) = get!(task, InProgress) {
match in_progress {
InProgressState::Scheduled { done_event, .. }
| InProgressState::InProgress(box InProgressStateInner {
done_event, ..
}) => {
let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
let listener = done_event.listen_with_note(move || {
if let Some(reader_desc) = reader_desc.as_ref() {
format!("try_read_task_output from {}", reader_desc())
} else {
"try_read_task_output (untracked)".to_string()
}
});
return Some(Ok(Err(listener)));
}
}
}
None
}

if self.should_track_children() && matches!(consistency, ReadConsistency::Strong) {
Expand Down Expand Up @@ -489,6 +501,10 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}

if let Some(value) = check_in_progress(self, &task, reader) {
return value;
}

if let Some(output) = get!(task, Output) {
let result = match output {
OutputValue::Cell(cell) => Some(Ok(Ok(RawVc::TaskCell(cell.task, cell.cell)))),
Expand Down Expand Up @@ -992,40 +1008,17 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
return None;
};
task.add_new(CachedDataItem::InProgress {
value: InProgressState::InProgress {
value: InProgressState::InProgress(Box::new(InProgressStateInner {
stale: false,
once_task,
done_event,
session_dependent: false,
marked_as_completed: false,
},
new_children: Default::default(),
})),
});

if self.should_track_children() {
// Make all current children outdated (remove left-over outdated children)
enum Child {
Current(TaskId),
Outdated(TaskId),
}
let children = iter_many!(task, Child { task } => Child::Current(task))
.chain(iter_many!(task, OutdatedChild { task } => Child::Outdated(task)))
.collect::<Vec<_>>();
for child in children {
match child {
Child::Current(child) => {
let _ = task.add(CachedDataItem::OutdatedChild {
task: child,
value: (),
});
}
Child::Outdated(child) => {
if !task.has_key(&CachedDataItemKey::Child { task: child }) {
task.remove(&CachedDataItemKey::OutdatedChild { task: child });
}
}
}
}

// Make all current collectibles outdated (remove left-over outdated collectibles)
enum Collectible {
Current(CollectibleRef, i32),
Expand Down Expand Up @@ -1192,24 +1185,43 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
) -> bool {
let mut ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id, TaskDataCategory::All);
let Some(in_progress) = get!(task, InProgress) else {
let Some(in_progress) = get_mut!(task, InProgress) else {
panic!("Task execution completed, but task is not in progress: {task:#?}");
};
let &InProgressState::InProgress { stale, .. } = in_progress else {
let &mut InProgressState::InProgress(box InProgressStateInner {
stale,
ref mut new_children,
..
}) = in_progress
else {
panic!("Task execution completed, but task is not in progress: {task:#?}");
};

// If the task is stale, reschedule it
if stale {
let Some(InProgressState::InProgress { done_event, .. }) = remove!(task, InProgress)
let Some(InProgressState::InProgress(box InProgressStateInner {
done_event,
new_children,
..
})) = remove!(task, InProgress)
else {
unreachable!();
};
task.add_new(CachedDataItem::InProgress {
value: InProgressState::Scheduled { done_event },
});

// All `new_children` are currently hold active with an active count and we need to undo
// that.
AggregationUpdateQueue::run(
AggregationUpdateJob::DecreaseActiveCounts {
task_ids: new_children.into_iter().collect(),
},
&mut ctx,
);
return true;
}
let mut new_children = take(new_children);

// TODO handle stateful
let _ = stateful;
Expand All @@ -1236,9 +1248,24 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
}

let mut removed_data: Vec<CachedDataItem> = Vec::new();
let mut queue = AggregationUpdateQueue::new();

let mut removed_data = Vec::new();
let mut old_edges = Vec::new();

// Connect children
{
for old_child in iter_many!(task, Child { task } => task) {
if !new_children.remove(&old_child) {
old_edges.push(OutdatedEdge::Child(old_child));
}
}

let active_count =
get!(task, Activeness).map_or(0, |activeness| activeness.active_counter);
connect_children(task_id, &mut task, new_children, &mut queue, active_count);
}

// Remove no longer existing cells and notify in progress cells
// find all outdated data items (removed cells, outdated edges)
removed_data.extend(
Expand All @@ -1263,12 +1290,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
}));
if self.should_track_children() {
old_edges.extend(task.iter(CachedDataItemType::OutdatedChild).filter_map(
|(key, _)| match key {
CachedDataItemKey::OutdatedChild { task } => Some(OutdatedEdge::Child(task)),
_ => None,
},
));
old_edges.extend(
task.iter(CachedDataItemType::OutdatedCollectible)
.filter_map(|(key, value)| match (key, value) {
Expand Down Expand Up @@ -1301,10 +1322,13 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {

drop(task);

// Remove outdated edges first, before removing in_progress+dirty flag.
// We need to make sure all outdated edges are removed before the task can potentially be
// scheduled and executed again
CleanupOldEdgesOperation::run(task_id, old_edges, &mut ctx);
{
let _span = tracing::trace_span!("CleanupOldEdgesOperation").entered();
// Remove outdated edges first, before removing in_progress+dirty flag.
// We need to make sure all outdated edges are removed before the task can potentially
// be scheduled and executed again
CleanupOldEdgesOperation::run(task_id, old_edges, queue, &mut ctx);
}

// When restoring from persistent caching the following might not be executed (since we can
// suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
Expand All @@ -1314,16 +1338,18 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let Some(in_progress) = remove!(task, InProgress) else {
panic!("Task execution completed, but task is not in progress: {task:#?}");
};
let InProgressState::InProgress {
let InProgressState::InProgress(box InProgressStateInner {
done_event,
once_task: _,
stale,
session_dependent,
marked_as_completed: _,
} = in_progress
new_children,
}) = in_progress
else {
panic!("Task execution completed, but task is not in progress: {task:#?}");
};
debug_assert!(new_children.is_empty());

// If the task is stale, reschedule it
if stale {
Expand Down Expand Up @@ -1663,9 +1689,10 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
) {
let mut ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task, TaskDataCategory::Data);
if let Some(InProgressState::InProgress {
session_dependent, ..
}) = get_mut!(task, InProgress)
if let Some(InProgressState::InProgress(box InProgressStateInner {
session_dependent,
..
})) = get_mut!(task, InProgress)
{
*session_dependent = true;
}
Expand All @@ -1678,10 +1705,10 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
) {
let mut ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task, TaskDataCategory::Data);
if let Some(InProgressState::InProgress {
if let Some(InProgressState::InProgress(box InProgressStateInner {
marked_as_completed,
..
}) = get_mut!(task, InProgress)
})) = get_mut!(task, InProgress)
{
*marked_as_completed = true;
// TODO this should remove the dirty state (also check session_dependent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ pub enum OutdatedEdge {
}

impl CleanupOldEdgesOperation {
pub fn run(task_id: TaskId, outdated: Vec<OutdatedEdge>, ctx: &mut impl ExecuteContext) {
let queue = AggregationUpdateQueue::new();
pub fn run(
task_id: TaskId,
outdated: Vec<OutdatedEdge>,
queue: AggregationUpdateQueue,
ctx: &mut impl ExecuteContext,
) {
CleanupOldEdgesOperation::RemoveEdges {
task_id,
outdated,
Expand Down
Loading
Loading