Skip to content

Commit 2e7d419

Browse files
committed
Multi-thread & Waking edge-cases
- Polling a future from Waker::wake is not expected by some Future libs. - Polling a future form Waker::wake means it will be polled on which ever thread the signal was emitted. Not neccessarely the original thread. - Dropping the future should clean up the signal connection if it wasn't emitted yet.
1 parent 8e1ad54 commit 2e7d419

File tree

1 file changed

+77
-32
lines changed

1 file changed

+77
-32
lines changed

godot-core/src/tools/async_support.rs

Lines changed: 77 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,32 @@ use std::future::Future;
33
use std::pin::Pin;
44
use std::sync::{Arc, Mutex};
55
use std::task::{Context, Poll, Wake, Waker};
6+
use std::thread::{self, ThreadId};
67

78
use crate::builtin::{Callable, Signal, Variant};
89
use crate::classes::object::ConnectFlags;
10+
use crate::classes::Os;
911
use crate::godot_error;
10-
use crate::meta::FromGodot;
12+
use crate::meta::{FromGodot, ToGodot};
1113
use crate::obj::EngineEnum;
1214

1315
pub fn godot_task(future: impl Future<Output = ()> + 'static) {
16+
let os = Os::singleton();
17+
18+
// Spawning new tasks is only allowed on the main thread for now.
19+
// We can not accept Sync + Send futures since all object references (i.e. Gd<T>) are not thread-safe. So a future has to remain on the
20+
// same thread it was created on. Godots signals on the other hand can be emitted on any thread, so it can't be guaranteed on which thread
21+
// a future will be polled.
22+
// By limiting async tasks to the main thread we can redirect all signal callbacks back to the main thread via `call_deferred`.
23+
//
24+
// Once thread-safe futures are possible the restriction can be lifted.
25+
if os.get_thread_caller_id() != os.get_main_thread_id() {
26+
return;
27+
}
28+
1429
let waker: Waker = ASYNC_RUNTIME.with_borrow_mut(move |rt| {
1530
let task_index = rt.add_task(Box::pin(future));
16-
Arc::new(GodotWaker::new(task_index)).into()
31+
Arc::new(GodotWaker::new(task_index, thread::current().id())).into()
1732
});
1833

1934
waker.wake();
@@ -71,39 +86,56 @@ impl AsyncRuntime {
7186

7287
struct GodotWaker {
7388
runtime_index: usize,
89+
thread_id: ThreadId,
7490
}
7591

7692
impl GodotWaker {
77-
fn new(index: usize) -> Self {
93+
fn new(index: usize, thread_id: ThreadId) -> Self {
7894
Self {
7995
runtime_index: index,
96+
thread_id,
8097
}
8198
}
8299
}
83100

84101
impl Wake for GodotWaker {
85102
fn wake(self: std::sync::Arc<Self>) {
86-
let waker: Waker = self.clone().into();
87-
let mut ctx = Context::from_waker(&waker);
88-
89-
ASYNC_RUNTIME.with_borrow_mut(|rt| {
90-
let Some(future) = rt.get_task(self.runtime_index) else {
91-
godot_error!("Future no longer exists! This is a bug!");
92-
return;
93-
};
94-
95-
// this does currently not support nested tasks.
96-
let result = future.poll(&mut ctx);
97-
match result {
98-
Poll::Pending => (),
99-
Poll::Ready(()) => rt.clear_task(self.runtime_index),
100-
}
103+
let callable = Callable::from_fn("GodotWaker::wake", move |_args| {
104+
let waker: Waker = self.clone().into();
105+
let mut ctx = Context::from_waker(&waker);
106+
107+
ASYNC_RUNTIME.with_borrow_mut(|rt| {
108+
let current_thread = thread::current().id();
109+
110+
if self.thread_id != current_thread {
111+
panic!("trying to poll future on a different thread!\nCurrent Thread: {:?}, Future Thread: {:?}", current_thread, self.thread_id);
112+
}
113+
114+
let Some(future) = rt.get_task(self.runtime_index) else {
115+
godot_error!("Future no longer exists! This is a bug!");
116+
return;
117+
};
118+
119+
// this does currently not support nested tasks.
120+
let result = future.poll(&mut ctx);
121+
match result {
122+
Poll::Pending => (),
123+
Poll::Ready(()) => rt.clear_task(self.runtime_index),
124+
}
125+
});
126+
127+
Ok(Variant::nil())
101128
});
129+
130+
// shedule waker to poll the future on the end of the frame.
131+
callable.to_variant().call("call_deferred", &[]);
102132
}
103133
}
104134

105135
pub struct SignalFuture<R: FromSignalArgs> {
106136
state: Arc<Mutex<(Option<R>, Option<Waker>)>>,
137+
callable: Callable,
138+
signal: Signal,
107139
}
108140

109141
impl<R: FromSignalArgs> SignalFuture<R> {
@@ -112,24 +144,27 @@ impl<R: FromSignalArgs> SignalFuture<R> {
112144
let callback_state = state.clone();
113145

114146
// the callable currently requires that the return value is Sync + Send
115-
signal.connect(
116-
Callable::from_fn("async_task", move |args: &[&Variant]| {
117-
let mut lock = callback_state.lock().unwrap();
118-
let waker = lock.1.take();
147+
let callable = Callable::from_fn("async_task", move |args: &[&Variant]| {
148+
let mut lock = callback_state.lock().unwrap();
149+
let waker = lock.1.take();
119150

120-
lock.0.replace(R::from_args(args));
121-
drop(lock);
151+
lock.0.replace(R::from_args(args));
152+
drop(lock);
122153

123-
if let Some(waker) = waker {
124-
waker.wake();
125-
}
154+
if let Some(waker) = waker {
155+
waker.wake();
156+
}
157+
158+
Ok(Variant::nil())
159+
});
126160

127-
Ok(Variant::nil())
128-
}),
129-
ConnectFlags::ONE_SHOT.ord() as i64,
130-
);
161+
signal.connect(callable.clone(), ConnectFlags::ONE_SHOT.ord() as i64);
131162

132-
Self { state }
163+
Self {
164+
state,
165+
callable,
166+
signal,
167+
}
133168
}
134169
}
135170

@@ -149,6 +184,16 @@ impl<R: FromSignalArgs> Future for SignalFuture<R> {
149184
}
150185
}
151186

187+
impl<R: FromSignalArgs> Drop for SignalFuture<R> {
188+
fn drop(&mut self) {
189+
if !self.signal.is_connected(self.callable.clone()) {
190+
return;
191+
}
192+
193+
self.signal.disconnect(self.callable.clone());
194+
}
195+
}
196+
152197
pub trait FromSignalArgs: Sync + Send + 'static {
153198
fn from_args(args: &[&Variant]) -> Self;
154199
}

0 commit comments

Comments
 (0)