Skip to content

Commit

Permalink
[Turbopack] add active_counter to keep tasks active (#74772)
Browse files Browse the repository at this point in the history
Rename AggregateRoot to Activeness and split into both sources

improve readablility

add active_counter as source of activeness

add Debug for DequeSet

<!-- Thanks for opening a PR! Your contribution is much appreciated.
To make sure your PR is handled as smoothly as possible we request that you follow the checklist sections below.
Choose the right checklist for the change(s) that you're making:

## For Contributors

### Improving Documentation

- Run `pnpm prettier-fix` to fix formatting issues before opening the PR.
- Read the Docs Contribution Guide to ensure your contribution follows the docs guidelines: https://nextjs.org/docs/community/contribution-guide

### Adding or Updating Examples

- The "examples guidelines" are followed from our contributing doc https://github.com/vercel/next.js/blob/canary/contributing/examples/adding-examples.md
- Make sure the linting passes by running `pnpm build && pnpm lint`. See https://github.com/vercel/next.js/blob/canary/contributing/repository/linting.md

### Fixing a bug

- Related issues linked using `fixes #number`
- Tests added. See: https://github.com/vercel/next.js/blob/canary/contributing/core/testing.md#writing-tests-for-nextjs
- Errors have a helpful link attached, see https://github.com/vercel/next.js/blob/canary/contributing.md

### Adding a feature

- Implements an existing feature request or RFC. Make sure the feature request has been accepted for implementation before opening a PR. (A discussion must be opened, see https://github.com/vercel/next.js/discussions/new?category=ideas)
- Related issues/discussions are linked using `fixes #number`
- e2e tests added (https://github.com/vercel/next.js/blob/canary/contributing/core/testing.md#writing-tests-for-nextjs)
- Documentation added
- Telemetry added. In case of a feature if it's used or not.
- Errors have a helpful link attached, see https://github.com/vercel/next.js/blob/canary/contributing.md


## For Maintainers

- Minimal description (aim for explaining to someone not on the team to understand the PR)
- When linking to a Slack thread, you might want to share details of the conclusion
- Link both the Linear (Fixes NEXT-xxx) and the GitHub issues
- Add review comments if necessary to explain to the reviewer the logic behind a change

### What?

### Why?

### How?

Closes NEXT-
Fixes #

-->
  • Loading branch information
sokra authored Jan 14, 2025
1 parent 3256ff8 commit c82d85e
Show file tree
Hide file tree
Showing 10 changed files with 389 additions and 71 deletions.
44 changes: 22 additions & 22 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ use crate::{
ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, TaskDirtyCause, TaskGuard,
},
persisted_storage_log::PersistedStorageLog,
storage::{get, get_many, get_mut, iter_many, remove, Storage},
storage::{get, get_many, get_mut, get_mut_or_insert_with, iter_many, remove, Storage},
},
backing_storage::BackingStorage,
data::{
ActiveType, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
CachedDataItemValue, CachedDataItemValueRef, CachedDataUpdate, CellRef, CollectibleRef,
CollectiblesRef, DirtyState, InProgressCellState, InProgressState, OutputValue, RootState,
CollectiblesRef, DirtyState, InProgressCellState, InProgressState, OutputValue, RootType,
},
utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded},
};
Expand Down Expand Up @@ -447,7 +447,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
.unwrap_or_default()
.get(self.session_id);
if dirty_tasks > 0 || is_dirty {
let root = get!(task, AggregateRoot);
let root = get!(task, Activeness);
let mut task_ids_to_schedule: Vec<_> = Vec::new();
// When there are dirty task, subscribe to the all_clean_event
let root = if let Some(root) = root {
Expand All @@ -456,10 +456,9 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
// If we don't have a root state, add one. This also makes sure all tasks stay
// active and this task won't stale. CachedActiveUntilClean
// is automatically removed when this task is clean.
task.add_new(CachedDataItem::AggregateRoot {
value: RootState::new(ActiveType::CachedActiveUntilClean, task_id),
});
// A newly added AggregateRoot need to make sure to schedule the tasks
get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
.set_active_until_clean();
// A newly added Activeness need to make sure to schedule the tasks
task_ids_to_schedule = get_many!(
task,
AggregatedDirtyContainer {
Expand All @@ -471,7 +470,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
if is_dirty {
task_ids_to_schedule.push(task_id);
}
get!(task, AggregateRoot).unwrap()
get!(task, Activeness).unwrap()
};
let listener = root.all_clean_event.listen_with_note(move || {
format!(
Expand Down Expand Up @@ -1373,10 +1372,11 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
};
if !aggregated_update.is_zero() {
if aggregated_update.get(self.session_id) < 0 {
if let Some(root_state) = get!(task, AggregateRoot) {
if let Some(root_state) = get_mut!(task, Activeness) {
root_state.all_clean_event.notify(usize::MAX);
if matches!(root_state.ty, ActiveType::CachedActiveUntilClean) {
task.remove(&CachedDataItemKey::AggregateRoot {});
root_state.unset_active_until_clean();
if root_state.is_empty() {
task.remove(&CachedDataItemKey::Activeness {});
}
}
}
Expand Down Expand Up @@ -1700,8 +1700,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
let task_id = self.transient_task_id_factory.get();
let root_type = match task_type {
TransientTaskType::Root(_) => ActiveType::RootTask,
TransientTaskType::Once(_) => ActiveType::OnceTask,
TransientTaskType::Root(_) => RootType::RootTask,
TransientTaskType::Once(_) => RootType::OnceTask,
};
self.transient_tasks.insert(
task_id,
Expand All @@ -1719,13 +1719,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
effective: u32::MAX,
},
});
task.add(CachedDataItem::AggregateRoot {
value: RootState::new(root_type, task_id),
task.add(CachedDataItem::Activeness {
value: ActivenessState::new_root(root_type, task_id),
});
task.add(CachedDataItem::new_scheduled(move || match root_type {
ActiveType::RootTask => "Root Task".to_string(),
ActiveType::OnceTask => "Once Task".to_string(),
_ => unreachable!(),
RootType::RootTask => "Root Task".to_string(),
RootType::OnceTask => "Once Task".to_string(),
}));
}
task_id
Expand All @@ -1744,11 +1743,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
dirty_containers.get(self.session_id) > 0
});
if is_dirty || has_dirty_containers {
if let Some(root_state) = get_mut!(task, AggregateRoot) {
if let Some(root_state) = get_mut!(task, Activeness) {
// We will finish the task, but it would be removed after the task is done
root_state.ty = ActiveType::CachedActiveUntilClean;
root_state.unset_root_type();
root_state.set_active_until_clean();
};
} else if let Some(root_state) = remove!(task, AggregateRoot) {
} else if let Some(root_state) = remove!(task, Activeness) {
// Technically nobody should be listening to this event, but just in case
// we notify it anyway
root_state.all_clean_event.notify(usize::MAX);
Expand Down
Loading

0 comments on commit c82d85e

Please sign in to comment.