From 22bcb92108413e4c89e950318a55f83532e49ecc Mon Sep 17 00:00:00 2001 From: Kevin Reid Date: Sat, 10 Feb 2024 22:09:15 -0800 Subject: [PATCH 1/4] Add `JoinHandle::into_join_future()`. This allows spawned threads to be incorporated into `Future`-based concurrency control without needing to add separate result-reporting channels and an additional layer of `catch_unwind()` to the thread functions. I believe this will be useful to async/blocking interop and for various applications which want to manage parallel tasks in a lightweight way. There is a small additional cost which is paid even if the mechanism is unused: the algorithm built into the shutdown of a spawned thread must obtain and invoke a `Waker`, and the `Packet` internal struct is larger by one `Mutex`. In the future, this `Mutex` should be replaced by something equivalent to `futures::task::AtomicWaker`, which will be more efficient and eliminate deadlock and blocking hazards, but `std` doesn't contain one of those yet. This is not an `impl IntoFuture for JoinHandle` so that it can avoid being insta-stable; particularly because during the design discussion, concerns were raised that a proper implementation should obey structured concurrency via an `AsyncDrop` that forces waiting for the thread. I personally think that would be a mistake, and structured spawning should be its own thing, but this choice of API permits either option in the future by keeping everything unstable, where a trait implementation would not. --- library/std/src/lib.rs | 1 + library/std/src/thread/mod.rs | 186 ++++++++++++++++++++++++++++--- library/std/src/thread/scoped.rs | 22 +++- library/std/src/thread/tests.rs | 48 +++++++- 4 files changed, 240 insertions(+), 17 deletions(-) diff --git a/library/std/src/lib.rs b/library/std/src/lib.rs index 5b94f036248cb..816bad2eb7103 100644 --- a/library/std/src/lib.rs +++ b/library/std/src/lib.rs @@ -348,6 +348,7 @@ #![feature(lazy_get)] #![feature(maybe_uninit_slice)] #![feature(maybe_uninit_write_slice)] +#![feature(noop_waker)] #![feature(panic_can_unwind)] #![feature(panic_internals)] #![feature(pin_coerce_unsized_trait)] diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index 227ee9d64f375..7cbf90527217e 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -164,17 +164,18 @@ use core::mem::MaybeUninit; use crate::any::Any; use crate::cell::UnsafeCell; +use crate::future::Future; use crate::marker::PhantomData; use crate::mem::{self, ManuallyDrop, forget}; use crate::num::NonZero; use crate::pin::Pin; -use crate::sync::Arc; use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::{Arc, Mutex, PoisonError}; use crate::sys::sync::Parker; use crate::sys::thread as imp; use crate::sys_common::{AsInner, IntoInner}; use crate::time::{Duration, Instant}; -use crate::{env, fmt, io, panic, panicking, str}; +use crate::{env, fmt, io, panic, panicking, str, task}; #[stable(feature = "scoped_threads", since = "1.63.0")] mod scoped; @@ -490,6 +491,7 @@ impl Builder { let my_packet: Arc> = Arc::new(Packet { scope: scope_data, result: UnsafeCell::new(None), + waker: Mutex::new(task::Waker::noop().clone()), _marker: PhantomData, }); let their_packet = my_packet.clone(); @@ -540,15 +542,35 @@ impl Builder { let try_result = panic::catch_unwind(panic::AssertUnwindSafe(|| { crate::sys::backtrace::__rust_begin_short_backtrace(f) })); + + // Store the `Result` of the thread that the `JoinHandle` can retrieve. + // // SAFETY: `their_packet` as been built just above and moved by the // closure (it is an Arc<...>) and `my_packet` will be stored in the // same `JoinInner` as this closure meaning the mutation will be // safe (not modify it and affect a value far away). unsafe { *their_packet.result.get() = Some(try_result) }; - // Here `their_packet` gets dropped, and if this is the last `Arc` for that packet that - // will call `decrement_num_running_threads` and therefore signal that this thread is - // done. + + // Fetch the `Waker` from the packet; this is needed to support `.into_join_future()`. + // If unused, this just returns `Waker::noop()` which will do nothing. + let waker: task::Waker = { + let placeholder = task::Waker::noop().clone(); + let mut guard = their_packet.waker.lock().unwrap_or_else(PoisonError::into_inner); + mem::replace(&mut *guard, placeholder) + }; + + // Here `their_packet` gets dropped, and if this is the last `Arc` for that packet + // (which happens if the `JoinHandle` has been dropped) that will call + // `decrement_num_running_threads` and therefore signal to the scope (if there is one) + // that this thread is done. drop(their_packet); + + // Now that we have become visibly “finished” by dropping the packet + // (`JoinInner::is_finished` will return true), we can use the `Waker` to signal + // any waiting `JoinFuture`. If instead we are being waited for by + // `JoinHandle::join()`, the actual platform thread termination will be the wakeup. + waker.wake(); + // Here, the lifetime `'scope` can end. `main` keeps running for a bit // after that before returning itself. }; @@ -1192,8 +1214,6 @@ impl ThreadId { } } } else { - use crate::sync::{Mutex, PoisonError}; - static COUNTER: Mutex = Mutex::new(0); let mut counter = COUNTER.lock().unwrap_or_else(PoisonError::into_inner); @@ -1635,16 +1655,30 @@ impl fmt::Debug for Thread { #[stable(feature = "rust1", since = "1.0.0")] pub type Result = crate::result::Result>; -// This packet is used to communicate the return value between the spawned -// thread and the rest of the program. It is shared through an `Arc` and -// there's no need for a mutex here because synchronization happens with `join()` -// (the caller will never read this packet until the thread has exited). -// -// An Arc to the packet is stored into a `JoinInner` which in turns is placed -// in `JoinHandle`. +/// This packet is used to communicate the return value between the spawned +/// thread and the rest of the program. It is shared through an [`Arc`]. +/// +/// An Arc to the packet is stored into a [`JoinInner`] which in turn is placed +/// in [`JoinHandle`] or [`ScopedJoinHandle`]. struct Packet<'scope, T> { + /// Communication with the enclosing thread scope if there is one. scope: Option>, + + /// Holds the return value. + /// + /// Synchronization happens via reference counting: as long as the `Arc` + /// has two or more references, this field is never read, and will only be written + /// once as the thread terminates. After that happens, either the packet is dropped, + /// or [`JoinInner::join()`] will `take()` the result value from here. result: UnsafeCell>>, + + /// If a [`JoinFuture`] for this thread exists and has been polled, + /// this is the waker from that poll. If it does not exist or has not + /// been polled yet, this is [`task::Waker::noop()`]. + // FIXME: This should be an `AtomicWaker` instead of a `Mutex`, + // to be cheaper and impossible to deadlock. + waker: Mutex, + _marker: PhantomData>, } @@ -1698,6 +1732,10 @@ impl<'scope, T> JoinInner<'scope, T> { self.native.join(); Arc::get_mut(&mut self.packet).unwrap().result.get_mut().take().unwrap() } + + fn is_finished(&self) -> bool { + Arc::strong_count(&self.packet) == 1 + } } /// An owned permission to join on a thread (block on its termination). @@ -1844,6 +1882,45 @@ impl JoinHandle { self.0.join() } + /// Returns a [`Future`] that resolves when the thread has finished. + /// + /// Its [output](Future::Output) value is identical to that of [`JoinHandle::join()`]; + /// this is the `async` equivalent of that blocking function. + /// + /// If the returned future is dropped (cancelled), the thread will become *detached*; + /// there will be no way to observe or wait for the thread’s termination. + /// This is identical to the behavior of `JoinHandle` itself. + /// + /// # Example + /// + // FIXME: ideally we would actually run this example, with the help of a trivial async executor + /// ```no_run + /// #![feature(thread_join_future)] + /// use std::thread; + /// + /// async fn do_some_heavy_tasks_in_parallel() -> thread::Result<()> { + /// let future_1 = thread::spawn(|| { + /// // ... do something ... + /// }).into_join_future(); + /// let future_2 = thread::spawn(|| { + /// // ... do something else ... + /// }).into_join_future(); + /// + /// // Both threads have been started; now await the completion of both. + /// future_1.await?; + /// future_2.await?; + /// Ok(()) + /// } + /// ``` + #[unstable(feature = "thread_join_future", issue = "none")] + pub fn into_join_future(self) -> JoinFuture<'static, T> { + // The method is not named `into_future()` to avoid overlapping with the stable + // `IntoFuture::into_future()`. We're not implementing `IntoFuture` in order to + // keep this unstable and preserve the *option* of compatibly making this obey structured + // concurrency via an async-Drop that waits for the thread to end. + JoinFuture::new(self.0) + } + /// Checks if the associated thread has finished running its main function. /// /// `is_finished` supports implementing a non-blocking join operation, by checking @@ -1856,7 +1933,7 @@ impl JoinHandle { /// to return quickly, without blocking for any significant amount of time. #[stable(feature = "thread_is_running", since = "1.61.0")] pub fn is_finished(&self) -> bool { - Arc::strong_count(&self.0.packet) == 1 + self.0.is_finished() } } @@ -1882,9 +1959,88 @@ impl fmt::Debug for JoinHandle { fn _assert_sync_and_send() { fn _assert_both() {} _assert_both::>(); + _assert_both::>(); _assert_both::(); } +/// A [`Future`] that resolves when a thread has finished. +/// +/// Its [output](Future::Output) value is identical to that of [`JoinHandle::join()`]; +/// this is the `async` equivalent of that blocking function. +/// Obtain it by calling [`JoinHandle::into_join_future()`] or +/// [`ScopedJoinHandle::into_join_future()`]. +/// +/// If a `JoinFuture` is dropped (cancelled), and the thread does not belong to a [scope], +/// the associated thread will become *detached*; +/// there will be no way to observe or wait for the thread’s termination. +#[unstable(feature = "thread_join_future", issue = "none")] +pub struct JoinFuture<'scope, T>(Option>); + +impl<'scope, T> JoinFuture<'scope, T> { + fn new(inner: JoinInner<'scope, T>) -> Self { + Self(Some(inner)) + } + + /// Implements the “getting a result” part of joining/polling, without blocking or changing + /// the `Waker`. Part of the implementation of `poll()`. + /// + /// If this returns `Some`, then `self.0` is now `None` and the future will panic + /// if polled again. + fn take_result(&mut self) -> Option> { + self.0.take_if(|i| i.is_finished()).map(JoinInner::join) + } +} + +#[unstable(feature = "thread_join_future", issue = "none")] +impl Future for JoinFuture<'_, T> { + type Output = Result; + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + if let Some(result) = self.take_result() { + return task::Poll::Ready(result); + } + + // Update the `Waker` the thread should wake when it completes. + { + let Some(inner) = &mut self.0 else { + panic!("polled after complete"); + }; + + let new_waker = cx.waker(); + + // Lock the mutex, and ignore the poison state because there are no meaningful ways + // the existing contents can be corrupted; they will be overwritten completely and the + // overwrite is atomic-in-the-database-sense. + let mut current_waker_guard = + inner.packet.waker.lock().unwrap_or_else(PoisonError::into_inner); + + // Overwrite the waker. Note that we are executing the new waker’s clone and the old + // waker’s destructor; these could panic (which will merely poison the lock) or hang, + // which will hold the lock, but the most that can do is prevent the thread from + // exiting because it's trying to acquire `packet.waker`, which it won't do while + // holding any *other* locks (...unless the thread’s data includes a lock guard that + // the waker also wants). + if !new_waker.will_wake(&*current_waker_guard) { + *current_waker_guard = new_waker.clone(); + } + } + + // Check for completion again in case the thread finished while we were busy + // setting the waker, to prevent a lost wakeup in that case. + if let Some(result) = self.take_result() { + task::Poll::Ready(result) + } else { + task::Poll::Pending + } + } +} + +#[unstable(feature = "thread_join_future", issue = "none")] +impl fmt::Debug for JoinFuture<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("JoinHandle").finish_non_exhaustive() + } +} + /// Returns an estimate of the default amount of parallelism a program should use. /// /// Parallelism is a resource. A given machine provides a certain capacity for diff --git a/library/std/src/thread/scoped.rs b/library/std/src/thread/scoped.rs index b2305b1eda7e1..e4978dc5df603 100644 --- a/library/std/src/thread/scoped.rs +++ b/library/std/src/thread/scoped.rs @@ -313,6 +313,26 @@ impl<'scope, T> ScopedJoinHandle<'scope, T> { self.0.join() } + /// Returns a [`Future`] that resolves when the thread has finished. + /// + /// Its [output] value is identical to that of [`ScopedJoinHandle::join()`]; + /// this is the `async` equivalent of that blocking function. + /// + /// Note that while this function allows waiting for a scoped thread from `async` + /// functions, the original [`scope()`] is still a blocking function which should + /// not be used in `async` functions. + /// + /// [`Future`]: crate::future::Future + /// [output]: crate::future::Future::Output + #[unstable(feature = "thread_join_future", issue = "none")] + pub fn into_join_future(self) -> super::JoinFuture<'scope, T> { + // There is no `ScopedJoinFuture` because the only difference between `JoinHandle` + // and `ScopedJoinHandle` is that `JoinHandle` has no lifetime parameter, because + // it was introduced before scoped threads. `JoinFuture` is new enough that we don’t + // need to make two versions of it. + super::JoinFuture::new(self.0) + } + /// Checks if the associated thread has finished running its main function. /// /// `is_finished` supports implementing a non-blocking join operation, by checking @@ -325,7 +345,7 @@ impl<'scope, T> ScopedJoinHandle<'scope, T> { /// to return quickly, without blocking for any significant amount of time. #[stable(feature = "scoped_threads", since = "1.63.0")] pub fn is_finished(&self) -> bool { - Arc::strong_count(&self.0.packet) == 1 + self.0.is_finished() } } diff --git a/library/std/src/thread/tests.rs b/library/std/src/thread/tests.rs index ff45e82bd9c71..849e5479676e8 100644 --- a/library/std/src/thread/tests.rs +++ b/library/std/src/thread/tests.rs @@ -1,12 +1,14 @@ use super::Builder; use crate::any::Any; +use crate::assert_matches::assert_matches; +use crate::future::Future as _; use crate::panic::panic_any; use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sync::mpsc::{Sender, channel}; use crate::sync::{Arc, Barrier}; use crate::thread::{self, Scope, ThreadId}; use crate::time::{Duration, Instant}; -use crate::{mem, result}; +use crate::{mem, result, task}; // !!! These tests are dangerous. If something is buggy, they will hang, !!! // !!! instead of exiting cleanly. This might wedge the buildbots. !!! @@ -410,3 +412,47 @@ fn test_minimal_thread_stack() { assert_eq!(before, 0); assert_eq!(COUNT.load(Ordering::Relaxed), 1); } + +fn join_future_test(scoped: bool) { + /// Simple `Waker` implementation. + /// If `std` ever gains a `block_on()`, we can consider replacing this with that. + struct MyWaker(Sender<()>); + impl task::Wake for MyWaker { + fn wake(self: Arc) { + _ = self.0.send(()); + } + } + + // Communication setup. + let (thread_delay_tx, thread_delay_rx) = channel(); + let (waker_tx, waker_rx) = channel(); + let waker = task::Waker::from(Arc::new(MyWaker(waker_tx))); + let ctx = &mut task::Context::from_waker(&waker); + + thread::scope(|s| { + // Create the thread and the future under test + let thread_body = move || { + thread_delay_rx.recv().unwrap(); + "hello" + }; + let mut future = crate::pin::pin!(if scoped { + s.spawn(thread_body).into_join_future() + } else { + thread::spawn(thread_body).into_join_future() + }); + + // Actual test + assert_matches!(future.as_mut().poll(ctx), task::Poll::Pending); + thread_delay_tx.send(()).unwrap(); // Unblock the thread + waker_rx.recv().unwrap(); // Wait for waking (as an executor would) + assert_matches!(future.as_mut().poll(ctx), task::Poll::Ready(Ok("hello"))); + }); +} +#[test] +fn join_future_unscoped() { + join_future_test(false) +} +#[test] +fn join_future_scoped() { + join_future_test(true) +} From 2db90bb997a62f63851ac35ba57b30095466921c Mon Sep 17 00:00:00 2001 From: Kevin Reid Date: Wed, 13 Nov 2024 20:50:47 -0800 Subject: [PATCH 2/4] Document that the thread may still be running. --- library/std/src/thread/mod.rs | 25 ++++++++++++++++++------- library/std/src/thread/scoped.rs | 12 +++++++++--- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index 7cbf90527217e..751f55572a316 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -1885,11 +1885,16 @@ impl JoinHandle { /// Returns a [`Future`] that resolves when the thread has finished. /// /// Its [output](Future::Output) value is identical to that of [`JoinHandle::join()`]; - /// this is the `async` equivalent of that blocking function. + /// this is the approximate `async` equivalent of that blocking function. /// - /// If the returned future is dropped (cancelled), the thread will become *detached*; - /// there will be no way to observe or wait for the thread’s termination. - /// This is identical to the behavior of `JoinHandle` itself. + /// # Details + /// + /// * If the returned future is dropped (cancelled), the thread will become *detached*; + /// there will be no way to observe or wait for the thread’s termination. + /// This is identical to the behavior of `JoinHandle` itself. + /// + /// * Unlike [`JoinHandle::join()`], the thread may still exist when the future resolves. + /// In particular, it may still be executing destructors for thread-local values. /// /// # Example /// @@ -1970,9 +1975,15 @@ fn _assert_sync_and_send() { /// Obtain it by calling [`JoinHandle::into_join_future()`] or /// [`ScopedJoinHandle::into_join_future()`]. /// -/// If a `JoinFuture` is dropped (cancelled), and the thread does not belong to a [scope], -/// the associated thread will become *detached*; -/// there will be no way to observe or wait for the thread’s termination. +/// # Behavior details +/// +/// * If a `JoinFuture` is dropped (cancelled), and the thread does not belong to a [scope], +/// the associated thread will become *detached*; +/// there will be no way to observe or wait for the thread’s termination. +/// +/// * Unlike [`JoinHandle::join()`], the thread may still exist when the future resolves. +/// In particular, it may still be executing destructors for thread-local values. +/// #[unstable(feature = "thread_join_future", issue = "none")] pub struct JoinFuture<'scope, T>(Option>); diff --git a/library/std/src/thread/scoped.rs b/library/std/src/thread/scoped.rs index e4978dc5df603..953cfef72c8e2 100644 --- a/library/std/src/thread/scoped.rs +++ b/library/std/src/thread/scoped.rs @@ -318,12 +318,18 @@ impl<'scope, T> ScopedJoinHandle<'scope, T> { /// Its [output] value is identical to that of [`ScopedJoinHandle::join()`]; /// this is the `async` equivalent of that blocking function. /// - /// Note that while this function allows waiting for a scoped thread from `async` - /// functions, the original [`scope()`] is still a blocking function which should - /// not be used in `async` functions. + /// # Behavior details + /// + /// * Unlike [`JoinHandle::join()`], the thread may still exist when the future resolves. + /// In particular, it may still be executing destructors for thread-local values. + /// + /// * While this function allows waiting for a scoped thread from `async` + /// functions, the original [`scope()`] is still a blocking function which should + /// not be used in `async` functions. /// /// [`Future`]: crate::future::Future /// [output]: crate::future::Future::Output + /// [`JoinHandle::join()`]: super::JoinHandle::join() #[unstable(feature = "thread_join_future", issue = "none")] pub fn into_join_future(self) -> super::JoinFuture<'scope, T> { // There is no `ScopedJoinFuture` because the only difference between `JoinHandle` From 3cb2c66a64f1e5ec7a7ef1677b5d349469246e99 Mon Sep 17 00:00:00 2001 From: Kevin Reid Date: Tue, 26 Nov 2024 09:54:14 -0800 Subject: [PATCH 3/4] Revert claim that the thread will be running; document the actual blocking behavior but without guarantees. --- library/std/src/thread/mod.rs | 34 +++++++++++++++++++++++++------- library/std/src/thread/scoped.rs | 23 ++++++++++++++------- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index 751f55572a316..7cb62ce6021b7 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -1885,16 +1885,25 @@ impl JoinHandle { /// Returns a [`Future`] that resolves when the thread has finished. /// /// Its [output](Future::Output) value is identical to that of [`JoinHandle::join()`]; - /// this is the approximate `async` equivalent of that blocking function. + /// this is the `async` equivalent of that blocking function. /// /// # Details /// - /// * If the returned future is dropped (cancelled), the thread will become *detached*; + /// * If the returned [`JoinFuture`] is dropped (cancelled), the thread will become *detached*; /// there will be no way to observe or wait for the thread’s termination. /// This is identical to the behavior of `JoinHandle` itself. /// - /// * Unlike [`JoinHandle::join()`], the thread may still exist when the future resolves. - /// In particular, it may still be executing destructors for thread-local values. + /// * The returned [`JoinFuture`] wakes when the thread has produced its result value or + /// panicked; however, as currently implemented, the thread may still be executing cleanup, + /// including destructors for [thread-local](crate::thread_local) data. + /// Polling the future may block until this cleanup completes, equivalently to if you called + /// [`JoinHandle::join()`] after observing that [`JoinHandle::is_finished()`] returned true. + /// This should usually be insignificant, but it is *possible* for thread locals to cause + /// this future to block arbitrarily long. + /// + /// This behavior is not currently guaranteed; future versions of Rust may instead have the + /// future not block at all (the thread may still be running) or not wake until the thread + /// has terminated. /// /// # Example /// @@ -1981,9 +1990,17 @@ fn _assert_sync_and_send() { /// the associated thread will become *detached*; /// there will be no way to observe or wait for the thread’s termination. /// -/// * Unlike [`JoinHandle::join()`], the thread may still exist when the future resolves. -/// In particular, it may still be executing destructors for thread-local values. -/// +/// * `JoinFuture` wakes when the thread has produced its result value or +/// panicked; however, as currently implemented, the thread may still be executing cleanup, +/// including destructors for [thread-local](crate::thread_local) data. +/// Polling the future may block until this cleanup completes, equivalently to if you called +/// [`JoinHandle::join()`] after observing that [`JoinHandle::is_finished()`] returned true. +/// This should usually be insignificant, but it is *possible* for thread locals to cause +/// this future to block arbitrarily long. +/// +/// This behavior is not currently guaranteed; future versions of Rust may instead have the +/// future not block at all (the thread may still be running) or not wake until the thread +/// has terminated. #[unstable(feature = "thread_join_future", issue = "none")] pub struct JoinFuture<'scope, T>(Option>); @@ -1997,6 +2014,9 @@ impl<'scope, T> JoinFuture<'scope, T> { /// /// If this returns `Some`, then `self.0` is now `None` and the future will panic /// if polled again. + /// + /// Note that this calls the actual `join` operation, and as a consequence will block until + /// TLS destructors complete. fn take_result(&mut self) -> Option> { self.0.take_if(|i| i.is_finished()).map(JoinInner::join) } diff --git a/library/std/src/thread/scoped.rs b/library/std/src/thread/scoped.rs index 953cfef72c8e2..687efdd821875 100644 --- a/library/std/src/thread/scoped.rs +++ b/library/std/src/thread/scoped.rs @@ -1,4 +1,4 @@ -use super::{Builder, JoinInner, Result, Thread, current, park}; +use super::{Builder, JoinFuture, JoinInner, Result, Thread, current, park}; use crate::marker::PhantomData; use crate::panic::{AssertUnwindSafe, catch_unwind, resume_unwind}; use crate::sync::Arc; @@ -320,23 +320,32 @@ impl<'scope, T> ScopedJoinHandle<'scope, T> { /// /// # Behavior details /// - /// * Unlike [`JoinHandle::join()`], the thread may still exist when the future resolves. - /// In particular, it may still be executing destructors for thread-local values. + /// * The returned [`JoinFuture`] wakes when the thread has produced its result value or + /// panicked; however, as currently implemented, the thread may still be executing cleanup, + /// including destructors for [thread-local](crate::thread_local) data. + /// Polling the future may block until this cleanup completes, equivalently to if you called + /// [`ScopedJoinHandle::join()`] after observing that [`ScopedJoinHandle::is_finished()`] + /// returned true. + /// This should usually be insignificant, but it is *possible* for thread locals to cause + /// this future to block arbitrarily long. + /// + /// This behavior is not currently guaranteed; future versions of Rust may instead have the + /// future not block at all (the thread may still be running) or not wake until the thread + /// has terminated. /// /// * While this function allows waiting for a scoped thread from `async` /// functions, the original [`scope()`] is still a blocking function which should - /// not be used in `async` functions. + /// not be called from an `async` functions. /// /// [`Future`]: crate::future::Future /// [output]: crate::future::Future::Output - /// [`JoinHandle::join()`]: super::JoinHandle::join() #[unstable(feature = "thread_join_future", issue = "none")] - pub fn into_join_future(self) -> super::JoinFuture<'scope, T> { + pub fn into_join_future(self) -> JoinFuture<'scope, T> { // There is no `ScopedJoinFuture` because the only difference between `JoinHandle` // and `ScopedJoinHandle` is that `JoinHandle` has no lifetime parameter, because // it was introduced before scoped threads. `JoinFuture` is new enough that we don’t // need to make two versions of it. - super::JoinFuture::new(self.0) + JoinFuture::new(self.0) } /// Checks if the associated thread has finished running its main function. From 9bd6904e7a4c859fd30b53ef3f8cdefd67f2db56 Mon Sep 17 00:00:00 2001 From: Kevin Reid Date: Tue, 26 Nov 2024 11:21:46 -0800 Subject: [PATCH 4/4] Use `clone_from()` for waker update. --- library/std/src/thread/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index 7cb62ce6021b7..dc8bd36d53fd9 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -2050,9 +2050,7 @@ impl Future for JoinFuture<'_, T> { // exiting because it's trying to acquire `packet.waker`, which it won't do while // holding any *other* locks (...unless the thread’s data includes a lock guard that // the waker also wants). - if !new_waker.will_wake(&*current_waker_guard) { - *current_waker_guard = new_waker.clone(); - } + current_waker_guard.clone_from(new_waker); } // Check for completion again in case the thread finished while we were busy