Skip to content

Commit 57fb0cf

Browse files
committed
Async Runtime
1 parent e0f2dca commit 57fb0cf

File tree

3 files changed

+323
-0
lines changed

3 files changed

+323
-0
lines changed
+316
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
/*
2+
* Copyright (c) godot-rust; Bromeon and contributors.
3+
* This Source Code Form is subject to the terms of the Mozilla Public
4+
* License, v. 2.0. If a copy of the MPL was not distributed with this
5+
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
6+
*/
7+
8+
use std::cell::RefCell;
9+
use std::future::Future;
10+
use std::marker::PhantomData;
11+
use std::pin::Pin;
12+
use std::sync::Arc;
13+
use std::task::{Context, Poll, Wake, Waker};
14+
use std::thread::{self, ThreadId};
15+
16+
use crate::builtin::{Callable, Variant};
17+
use crate::meta::ToGodot;
18+
19+
// ----------------------------------------------------------------------------------------------------------------------------------------------
20+
// Public interface
21+
22+
pub fn godot_task(future: impl Future<Output = ()> + 'static) -> TaskHandle {
23+
// Spawning new tasks is only allowed on the main thread for now.
24+
// We can not accept Sync + Send futures since all object references (i.e. Gd<T>) are not thread-safe. So a future has to remain on the
25+
// same thread it was created on. Godots signals on the other hand can be emitted on any thread, so it can't be guaranteed on which thread
26+
// a future will be polled.
27+
// By limiting async tasks to the main thread we can redirect all signal callbacks back to the main thread via `call_deferred`.
28+
//
29+
// Once thread-safe futures are possible the restriction can be lifted.
30+
assert!(
31+
crate::init::is_main_thread(),
32+
"godot_task can only be used on the main thread!"
33+
);
34+
35+
let (task_handle, waker) = ASYNC_RUNTIME.with_borrow_mut(move |rt| {
36+
let task_handle = rt.add_task(Box::pin(future));
37+
let godot_waker = Arc::new(GodotWaker::new(
38+
task_handle.index,
39+
task_handle.id,
40+
thread::current().id(),
41+
));
42+
43+
(task_handle, Waker::from(godot_waker))
44+
});
45+
46+
waker.wake();
47+
task_handle
48+
}
49+
50+
// ----------------------------------------------------------------------------------------------------------------------------------------------
51+
// Async Runtime
52+
53+
thread_local! {
54+
static ASYNC_RUNTIME: RefCell<AsyncRuntime> = RefCell::new(AsyncRuntime::new());
55+
}
56+
57+
/// Will be called during engine shudown.
58+
///
59+
/// We have to drop all the remaining Futures during engine shutdown. This avoids them being dropped at process termination where they would
60+
/// try to access engine resources, which leads to SEGFAULTs.
61+
pub(crate) fn cleanup() {
62+
ASYNC_RUNTIME.take();
63+
}
64+
65+
#[derive(Default)]
66+
enum FutureSlotState<T> {
67+
/// Slot is currently empty.
68+
#[default]
69+
Empty,
70+
/// Slot was previously occupied but the future has been canceled or the slot reused.
71+
Gone,
72+
/// Slot contains a pending future.
73+
Pending(T),
74+
/// Slot contains a future which is currently being polled.
75+
Polling,
76+
}
77+
78+
struct FutureSlot<T> {
79+
value: FutureSlotState<T>,
80+
id: u64,
81+
}
82+
83+
impl<T> FutureSlot<T> {
84+
fn pending(id: u64, value: T) -> Self {
85+
Self {
86+
value: FutureSlotState::Pending(value),
87+
id,
88+
}
89+
}
90+
91+
fn is_empty(&self) -> bool {
92+
matches!(self.value, FutureSlotState::Empty | FutureSlotState::Gone)
93+
}
94+
95+
fn clear(&mut self) {
96+
self.value = FutureSlotState::Gone;
97+
}
98+
99+
fn take(&mut self, id: u64) -> FutureSlotState<T> {
100+
match self.value {
101+
FutureSlotState::Empty => FutureSlotState::Empty,
102+
FutureSlotState::Polling => FutureSlotState::Polling,
103+
FutureSlotState::Gone => FutureSlotState::Gone,
104+
FutureSlotState::Pending(_) if self.id != id => FutureSlotState::Gone,
105+
FutureSlotState::Pending(_) => {
106+
std::mem::replace(&mut self.value, FutureSlotState::Polling)
107+
}
108+
}
109+
}
110+
111+
fn park(&mut self, value: T) {
112+
match self.value {
113+
FutureSlotState::Empty | FutureSlotState::Gone => {
114+
panic!("Future slot is currently unoccupied, future can not be parked here!");
115+
}
116+
FutureSlotState::Pending(_) => {
117+
panic!("Future slot is already occupied by a different future!")
118+
}
119+
FutureSlotState::Polling => {
120+
self.value = FutureSlotState::Pending(value);
121+
}
122+
}
123+
}
124+
}
125+
126+
pub struct TaskHandle {
127+
index: usize,
128+
id: u64,
129+
_pd: PhantomData<*const ()>,
130+
}
131+
132+
impl TaskHandle {
133+
fn new(index: usize, id: u64) -> Self {
134+
Self {
135+
index,
136+
id,
137+
_pd: PhantomData,
138+
}
139+
}
140+
141+
pub fn cancel(self) {
142+
ASYNC_RUNTIME.with_borrow_mut(|rt| {
143+
let Some(task) = rt.tasks.get(self.index) else {
144+
return;
145+
};
146+
147+
let alive = match task.value {
148+
FutureSlotState::Empty => {
149+
panic!("Future slot is empty when canceling it! This is a bug!")
150+
}
151+
FutureSlotState::Gone => false,
152+
FutureSlotState::Pending(_) => task.id == self.id,
153+
FutureSlotState::Polling => panic!("Can not cancel future from inside it!"),
154+
};
155+
156+
if !alive {
157+
return;
158+
}
159+
160+
rt.clear_task(self.index);
161+
})
162+
}
163+
164+
pub fn is_pending(&self) -> bool {
165+
ASYNC_RUNTIME.with_borrow(|rt| {
166+
let slot = rt.tasks.get(self.index).expect("Slot at index must exist!");
167+
168+
if slot.id != self.id {
169+
return false;
170+
}
171+
172+
matches!(
173+
slot.value,
174+
FutureSlotState::Pending(_) | FutureSlotState::Polling
175+
)
176+
})
177+
}
178+
}
179+
180+
#[derive(Default)]
181+
struct AsyncRuntime {
182+
tasks: Vec<FutureSlot<Pin<Box<dyn Future<Output = ()>>>>>,
183+
task_counter: u64,
184+
}
185+
186+
impl AsyncRuntime {
187+
fn new() -> Self {
188+
Self {
189+
tasks: Vec::with_capacity(10),
190+
task_counter: 0,
191+
}
192+
}
193+
194+
fn next_id(&mut self) -> u64 {
195+
let id = self.task_counter;
196+
self.task_counter += 1;
197+
id
198+
}
199+
200+
fn add_task<F: Future<Output = ()> + 'static>(&mut self, future: F) -> TaskHandle {
201+
let id = self.next_id();
202+
let index_slot = self
203+
.tasks
204+
.iter_mut()
205+
.enumerate()
206+
.find(|(_, slot)| slot.is_empty());
207+
208+
let boxed = Box::pin(future);
209+
210+
let index = match index_slot {
211+
Some((index, slot)) => {
212+
*slot = FutureSlot::pending(id, boxed);
213+
index
214+
}
215+
None => {
216+
self.tasks.push(FutureSlot::pending(id, boxed));
217+
self.tasks.len() - 1
218+
}
219+
};
220+
221+
TaskHandle::new(index, id)
222+
}
223+
224+
fn get_task(
225+
&mut self,
226+
index: usize,
227+
id: u64,
228+
) -> FutureSlotState<Pin<Box<dyn Future<Output = ()> + 'static>>> {
229+
let slot = self.tasks.get_mut(index);
230+
231+
slot.map(|inner| inner.take(id)).unwrap_or_default()
232+
}
233+
234+
fn clear_task(&mut self, index: usize) {
235+
self.tasks[index].clear();
236+
}
237+
238+
fn park_task(&mut self, index: usize, future: Pin<Box<dyn Future<Output = ()>>>) {
239+
self.tasks[index].park(future);
240+
}
241+
}
242+
243+
struct GodotWaker {
244+
runtime_index: usize,
245+
task_id: u64,
246+
thread_id: ThreadId,
247+
}
248+
249+
impl GodotWaker {
250+
fn new(index: usize, task_id: u64, thread_id: ThreadId) -> Self {
251+
Self {
252+
runtime_index: index,
253+
thread_id,
254+
task_id,
255+
}
256+
}
257+
}
258+
259+
impl Wake for GodotWaker {
260+
fn wake(self: std::sync::Arc<Self>) {
261+
let callable = Callable::from_local_fn("GodotWaker::wake", move |_args| {
262+
let current_thread = thread::current().id();
263+
264+
assert_eq!(
265+
self.thread_id,
266+
current_thread,
267+
"trying to poll future on a different thread!\n Current thread: {:?}\n Future thread: {:?}",
268+
current_thread,
269+
self.thread_id,
270+
);
271+
272+
let waker = Waker::from(self.clone());
273+
let mut ctx = Context::from_waker(&waker);
274+
275+
// Take future out of the runtime.
276+
let future = ASYNC_RUNTIME.with_borrow_mut(|rt| {
277+
match rt.get_task(self.runtime_index, self.task_id) {
278+
FutureSlotState::Empty => {
279+
panic!("Future slot is empty when waking it! This is a bug!");
280+
},
281+
282+
FutureSlotState::Gone => {
283+
None
284+
}
285+
286+
FutureSlotState::Polling => {
287+
panic!("The same GodotWaker has been called recursively, this is not expected!");
288+
}
289+
290+
FutureSlotState::Pending(future) => Some(future),
291+
}
292+
});
293+
294+
let Some(mut future) = future else {
295+
// Future has been canceled while the waker was already triggered.
296+
return Ok(Variant::nil());
297+
};
298+
299+
let result = future.as_mut().poll(&mut ctx);
300+
301+
// Update the state of the Future in the runtime.
302+
ASYNC_RUNTIME.with_borrow_mut(|rt| match result {
303+
// Future is still pending, so we park it again.
304+
Poll::Pending => rt.park_task(self.runtime_index, future),
305+
306+
// Future has resolved, so we remove it from the runtime.
307+
Poll::Ready(()) => rt.clear_task(self.runtime_index),
308+
});
309+
310+
Ok(Variant::nil())
311+
});
312+
313+
// Schedule waker to poll the Future at the end of the frame.
314+
callable.to_variant().call("call_deferred", &[]);
315+
}
316+
}

godot-core/src/builtin/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ pub mod __prelude_reexport {
155155
use super::*;
156156

157157
pub use aabb::*;
158+
#[cfg(since_api = "4.2")]
159+
pub use async_runtime::{godot_task, TaskHandle};
158160
pub use basis::*;
159161
pub use callable::*;
160162
pub use collections::containers::*;
@@ -203,6 +205,8 @@ mod macros;
203205

204206
// Other modules
205207
mod aabb;
208+
#[cfg(since_api = "4.2")]
209+
pub(crate) mod async_runtime;
206210
mod basis;
207211
mod callable;
208212
mod collections;

godot-core/src/init/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ fn gdext_on_level_deinit(level: InitLevel) {
172172
// If lowest level is unloaded, call global deinitialization.
173173
// No business logic by itself, but ensures consistency if re-initialization (hot-reload on Linux) occurs.
174174

175+
#[cfg(since_api = "4.2")]
176+
crate::builtin::async_runtime::cleanup();
177+
175178
// Garbage-collect various statics.
176179
// SAFETY: this is the last time meta APIs are used.
177180
unsafe {

0 commit comments

Comments
 (0)