Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Jun 12, 2024
1 parent d88b0f8 commit 46caca5
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
7 changes: 6 additions & 1 deletion crates/polars-stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ impl Executor {
}

pub struct TaskScope<'scope, 'env: 'scope> {
// Keep track of in-progress tasks so we can forcibly cancel them
// when the scope ends, to ensure the lifetimes are respected.
// Tasks add their own key to completed_tasks when done so we can
// reclaim the memory used by the cancel_handles.
cancel_handles: Mutex<SlotMap<TaskKey, CancelHandle>>,
completed_tasks: Arc<Mutex<Vec<TaskKey>>>,

Expand Down Expand Up @@ -266,9 +270,10 @@ impl<'scope, 'env> TaskScope<'scope, 'env> {
let (run, jh) = unsafe {
// SAFETY: we make sure to cancel this task before 'scope ends.
let executor = Executor::global();
let on_wake = move |task| executor.schedule_task(task);
task::spawn_with_lifetime(
fut,
move |task| executor.schedule_task(task),
on_wake,
TaskMetadata {
task_key,
priority,
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-stream/src/executor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ where
JoinHandle(Some(task))
}

/// Takes a future and turns it into a runnable task with associated metadata.
///
/// When the task is pending its waker will be set to call schedule
/// with the runnable.
pub unsafe fn spawn_with_lifetime<'a, F, S, M>(
future: F,
schedule: S,
Expand Down

0 comments on commit 46caca5

Please sign in to comment.