Skip to content

Commit f79a2c0

Browse files
committed
Return task handle to cancel pending tasks
1 parent a3c28c7 commit f79a2c0

File tree

2 files changed

+166
-36
lines changed

2 files changed

+166
-36
lines changed

godot-core/src/tools/async_support.rs

+146-36
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::cell::RefCell;
22
use std::future::Future;
3+
use std::marker::PhantomData;
34
use std::pin::Pin;
45
use std::sync::{Arc, Mutex};
56
use std::task::{Context, Poll, Wake, Waker};
@@ -8,10 +9,11 @@ use std::thread::{self, ThreadId};
89
use crate::builtin::{Callable, Signal, Variant};
910
use crate::classes::object::ConnectFlags;
1011
use crate::classes::Os;
12+
use crate::godot_warn;
1113
use crate::meta::{FromGodot, ToGodot};
1214
use crate::obj::EngineEnum;
1315

14-
pub fn godot_task(future: impl Future<Output = ()> + 'static) {
16+
pub fn godot_task(future: impl Future<Output = ()> + 'static) -> TaskHandle {
1517
let os = Os::singleton();
1618

1719
// Spawning new tasks is only allowed on the main thread for now.
@@ -22,71 +24,162 @@ pub fn godot_task(future: impl Future<Output = ()> + 'static) {
2224
//
2325
// Once thread-safe futures are possible the restriction can be lifted.
2426
if os.get_thread_caller_id() != os.get_main_thread_id() {
25-
return;
27+
panic!("godot_task can only be used on the main thread!");
2628
}
2729

28-
let waker: Waker = ASYNC_RUNTIME.with_borrow_mut(move |rt| {
29-
let task_index = rt.add_task(Box::pin(future));
30-
Arc::new(GodotWaker::new(task_index, thread::current().id())).into()
30+
let (task_handle, waker): (_, Waker) = ASYNC_RUNTIME.with_borrow_mut(move |rt| {
31+
let task_handle = rt.add_task(Box::pin(future));
32+
let waker = Arc::new(GodotWaker::new(
33+
task_handle.index,
34+
task_handle.id,
35+
thread::current().id(),
36+
))
37+
.into();
38+
39+
(task_handle, waker)
3140
});
3241

3342
waker.wake();
43+
task_handle
3444
}
3545

3646
thread_local! { pub(crate) static ASYNC_RUNTIME: RefCell<AsyncRuntime> = RefCell::new(AsyncRuntime::new()); }
3747

3848
#[derive(Default)]
39-
enum FutureSlot<T> {
49+
enum FutureSlotState<T> {
50+
/// Slot is currently empty.
4051
#[default]
4152
Empty,
53+
/// Slot was previously occupied but the future has been canceled or the slot reused.
54+
Gone,
55+
/// Slot contains a pending future.
4256
Pending(T),
57+
/// slot contains a future which is currently being polled.
4358
Polling,
4459
}
4560

61+
struct FutureSlot<T> {
62+
value: FutureSlotState<T>,
63+
id: u64,
64+
}
65+
4666
impl<T> FutureSlot<T> {
67+
fn pending(id: u64, value: T) -> Self {
68+
Self {
69+
value: FutureSlotState::Pending(value),
70+
id,
71+
}
72+
}
73+
4774
fn is_empty(&self) -> bool {
48-
matches!(self, Self::Empty)
75+
matches!(self.value, FutureSlotState::Empty | FutureSlotState::Gone)
4976
}
5077

5178
fn clear(&mut self) {
52-
*self = Self::Empty;
79+
self.value = FutureSlotState::Empty;
5380
}
5481

55-
fn take(&mut self) -> Self {
56-
match self {
57-
Self::Empty => Self::Empty,
58-
Self::Pending(_) => std::mem::replace(self, Self::Polling),
59-
Self::Polling => Self::Polling,
82+
fn cancel(&mut self) {
83+
self.value = FutureSlotState::Gone;
84+
}
85+
86+
fn take(&mut self, id: u64) -> FutureSlotState<T> {
87+
match self.value {
88+
FutureSlotState::Empty => FutureSlotState::Empty,
89+
FutureSlotState::Polling => FutureSlotState::Polling,
90+
FutureSlotState::Gone => FutureSlotState::Gone,
91+
FutureSlotState::Pending(_) if self.id != id => FutureSlotState::Gone,
92+
FutureSlotState::Pending(_) => {
93+
std::mem::replace(&mut self.value, FutureSlotState::Polling)
94+
}
6095
}
6196
}
6297

6398
fn park(&mut self, value: T) {
64-
match self {
65-
Self::Empty => {
99+
match self.value {
100+
FutureSlotState::Empty | FutureSlotState::Gone => {
66101
panic!("Future slot is currently unoccupied, future can not be parked here!");
67102
}
68-
69-
Self::Pending(_) => panic!("Future slot is already occupied by a different future!"),
70-
Self::Polling => {
71-
*self = Self::Pending(value);
103+
FutureSlotState::Pending(_) => {
104+
panic!("Future slot is already occupied by a different future!")
105+
}
106+
FutureSlotState::Polling => {
107+
self.value = FutureSlotState::Pending(value);
72108
}
73109
}
74110
}
75111
}
76112

113+
pub struct TaskHandle {
114+
index: usize,
115+
id: u64,
116+
_pd: PhantomData<*const ()>,
117+
}
118+
119+
impl TaskHandle {
120+
fn new(index: usize, id: u64) -> Self {
121+
Self {
122+
index,
123+
id,
124+
_pd: PhantomData,
125+
}
126+
}
127+
128+
pub fn cancel(self) {
129+
ASYNC_RUNTIME.with_borrow_mut(|rt| {
130+
let Some(task) = rt.tasks.get(self.index) else {
131+
return;
132+
};
133+
134+
let alive = match task.value {
135+
FutureSlotState::Empty | FutureSlotState::Gone => false,
136+
FutureSlotState::Pending(_) => task.id == self.id,
137+
FutureSlotState::Polling => panic!("Can not cancel future from inside it!"),
138+
};
139+
140+
if !alive {
141+
return;
142+
}
143+
144+
rt.cancel_task(self.index);
145+
})
146+
}
147+
148+
pub fn is_pending(&self) -> bool {
149+
ASYNC_RUNTIME.with_borrow(|rt| {
150+
let slot = rt.tasks.get(self.index).expect("Slot at index must exist!");
151+
152+
if slot.id != self.id {
153+
return false;
154+
}
155+
156+
matches!(slot.value, FutureSlotState::Pending(_))
157+
})
158+
}
159+
}
160+
77161
#[derive(Default)]
78162
pub(crate) struct AsyncRuntime {
79163
tasks: Vec<FutureSlot<Pin<Box<dyn Future<Output = ()>>>>>,
164+
task_counter: u64,
80165
}
81166

82167
impl AsyncRuntime {
83168
fn new() -> Self {
84169
Self {
85170
tasks: Vec::with_capacity(10),
171+
task_counter: 0,
86172
}
87173
}
88174

89-
fn add_task<F: Future<Output = ()> + 'static>(&mut self, future: F) -> usize {
175+
fn next_id(&mut self) -> u64 {
176+
let id = self.task_counter;
177+
self.task_counter += 1;
178+
id
179+
}
180+
181+
fn add_task<F: Future<Output = ()> + 'static>(&mut self, future: F) -> TaskHandle {
182+
let id = self.next_id();
90183
let slot = self
91184
.tasks
92185
.iter_mut()
@@ -95,33 +188,36 @@ impl AsyncRuntime {
95188

96189
let boxed = Box::pin(future);
97190

98-
match slot {
191+
let index = match slot {
99192
Some((index, slot)) => {
100-
*slot = FutureSlot::Pending(boxed);
193+
*slot = FutureSlot::pending(id, boxed);
101194
index
102195
}
103196
None => {
104-
self.tasks.push(FutureSlot::Pending(boxed));
197+
self.tasks.push(FutureSlot::pending(id, boxed));
105198
self.tasks.len() - 1
106199
}
107-
}
200+
};
201+
202+
TaskHandle::new(index, id)
108203
}
109204

110205
fn get_task(
111206
&mut self,
112207
index: usize,
113-
) -> FutureSlot<Pin<Box<dyn Future<Output = ()> + 'static>>> {
208+
id: u64,
209+
) -> FutureSlotState<Pin<Box<dyn Future<Output = ()> + 'static>>> {
114210
let slot = self.tasks.get_mut(index);
115211

116-
slot.map(|inner| inner.take()).unwrap_or_default()
212+
slot.map(|inner| inner.take(id)).unwrap_or_default()
117213
}
118214

119215
fn clear_task(&mut self, index: usize) {
120-
if index >= self.tasks.len() {
121-
return;
122-
}
216+
self.tasks[index].clear();
217+
}
123218

124-
self.tasks[0].clear();
219+
fn cancel_task(&mut self, index: usize) {
220+
self.tasks[index].cancel();
125221
}
126222

127223
fn park_task(&mut self, index: usize, future: Pin<Box<dyn Future<Output = ()>>>) {
@@ -131,14 +227,16 @@ impl AsyncRuntime {
131227

132228
struct GodotWaker {
133229
runtime_index: usize,
230+
task_id: u64,
134231
thread_id: ThreadId,
135232
}
136233

137234
impl GodotWaker {
138-
fn new(index: usize, thread_id: ThreadId) -> Self {
235+
fn new(index: usize, task_id: u64, thread_id: ThreadId) -> Self {
139236
Self {
140237
runtime_index: index,
141238
thread_id,
239+
task_id,
142240
}
143241
}
144242
}
@@ -156,20 +254,29 @@ impl Wake for GodotWaker {
156254
let mut ctx = Context::from_waker(&waker);
157255

158256
// take future out of the runtime.
159-
let mut future = ASYNC_RUNTIME.with_borrow_mut(|rt| {
160-
match rt.get_task(self.runtime_index) {
161-
FutureSlot::Empty => {
257+
let future = ASYNC_RUNTIME.with_borrow_mut(|rt| {
258+
match rt.get_task(self.runtime_index, self.task_id) {
259+
FutureSlotState::Empty => {
162260
panic!("Future no longer exists when waking it! This is a bug!");
163261
},
164262

165-
FutureSlot::Polling => {
263+
FutureSlotState::Gone => {
264+
None
265+
}
266+
267+
FutureSlotState::Polling => {
166268
panic!("The same GodotWaker has been called recursively, this is not expected!");
167269
}
168270

169-
FutureSlot::Pending(future) => future
271+
FutureSlotState::Pending(future) => Some(future),
170272
}
171273
});
172274

275+
let Some(mut future) = future else {
276+
// future has been canceled while the waker was already triggered.
277+
return Ok(Variant::nil());
278+
};
279+
173280
let result = future.as_mut().poll(&mut ctx);
174281

175282
// update runtime.
@@ -241,14 +348,17 @@ impl<R: FromSignalArgs> Future for SignalFuture<R> {
241348
impl<R: FromSignalArgs> Drop for SignalFuture<R> {
242349
fn drop(&mut self) {
243350
if !self.callable.is_valid() {
351+
godot_warn!("dropping furure but callable no longer exists!");
244352
return;
245353
}
246354

247355
if self.signal.object().is_none() {
356+
godot_warn!("dropping furure but signal owner no longer exists!");
248357
return;
249358
}
250359

251360
if self.signal.is_connected(self.callable.clone()) {
361+
godot_warn!("dropping furure but signal still connected!");
252362
self.signal.disconnect(self.callable.clone());
253363
}
254364
}

itest/rust/src/engine_tests/async_test.rs

+20
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,23 @@ fn start_async_task() {
4646
});
4747
godot_print!("after godot_task...");
4848
}
49+
50+
#[itest]
51+
fn cancel_async_task() {
52+
let tree = Engine::singleton()
53+
.get_main_loop()
54+
.unwrap()
55+
.cast::<SceneTree>();
56+
57+
let signal = Signal::from_object_signal(&tree, "process_frame");
58+
59+
let handle = godot_task(async move {
60+
godot_print!("starting task to be canceled...");
61+
62+
let _: () = signal.to_future().await;
63+
64+
unreachable!();
65+
});
66+
67+
handle.cancel();
68+
}

0 commit comments

Comments
 (0)