Skip to content

Commit 28464e6

Browse files
committed
Async blocking task support
Added the `BlockingTaskQueue` type BlockingTaskQueue allows a Rust closure to be scheduled on a foreign thread where blocking operations are okay. The closure runs inside the parent future, which is nice because it allows the closure to reference its outside scope. On the foreign side, a `BlockingTaskQueue` is a native type that runs a task in some sort of thread queue (`DispatchQueue`, `CoroutineContext`, `futures.Executor`, etc.). Updated some of the foreign code so that the handle-maps used for callback interfaces can also be used for this. It's quite messy right now, but mozilla#1823 should clean things up. Renamed `Handle` to `UniffiHandle` on Kotlin, since `Handle` can easily conflict with user names. In fact it did on the `custom_types` fixture, the only reason that this worked before was that that fixture didn't use callback interfaces. Added new tests for this in the futures fixtures. Updated the tests to check that handles are being released properly.
1 parent 5308aa0 commit 28464e6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1277
-318
lines changed

Cargo.lock

Lines changed: 75 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/manual/src/futures.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,63 @@ This code uses `asyncio` to drive the future to completion, while our exposed fu
4545
In Rust `Future` terminology this means the foreign bindings supply the "executor" - think event-loop, or async runtime. In this example it's `asyncio`. There's no requirement for a Rust event loop.
4646

4747
There are [some great API docs](https://docs.rs/uniffi_core/latest/uniffi_core/ffi/rustfuture/index.html) on the implementation that are well worth a read.
48+
49+
## Blocking tasks
50+
51+
Rust executors are designed around an assumption that the `Future::poll` function will return quickly.
52+
This assumption, combined with cooperative scheduling, allows for a large number of futures to be handled by a small number of threads.
53+
Foreign executors make similar assumptions and sometimes more extreme ones.
54+
For example, the Python eventloop is single threaded -- if any task spends a long time between `await` points, then it will block all other tasks from progressing.
55+
56+
This raises the question of how async code can interact with blocking code that performs blocking IO, long-running computations without `await` breaks, etc.
57+
UniFFI defines the `BlockingTaskQueue` type, which is a foreign object that schedules work on a thread where it's okay to block.
58+
59+
On Rust, `BlockingTaskQueue` is a UniFFI type that can safely run blocking code.
60+
It's `execute` method works like tokio's [block_in_place](https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html) function.
61+
It inputs a closure and runs it in the `BlockingTaskQueue`.
62+
This closure can reference the outside scope (i.e. it does not need to be `'static`).
63+
For example:
64+
65+
```rust
66+
#[derive(uniffi::Object)]
67+
struct DataStore {
68+
// Used to run blocking tasks
69+
queue: uniffi::BlockingTaskQueue,
70+
// Low-level DB object with blocking methods
71+
db: Mutex<Database>,
72+
}
73+
74+
#[uniffi::export]
75+
impl DataStore {
76+
#[uniffi::constructor]
77+
fn new(queue: uniffi::BlockingTaskQueue) -> Self {
78+
Self {
79+
queue,
80+
db: Mutex::new(Database::new())
81+
}
82+
}
83+
84+
fn fetch_all_items(&self) -> Vec<DbItem> {
85+
self.queue.execute(|| self.db.lock().fetch_all_items())
86+
}
87+
}
88+
```
89+
90+
On the foreign side `BlockingTaskQueue` corresponds to a language-dependent class.
91+
92+
### Kotlin
93+
Kotlin uses `CoroutineContext` for its `BlockingTaskQueue`.
94+
Any `CoroutineContext` will work, but `Dispatchers.IO` is usually a good choice.
95+
A DataStore from the example above can be created with `DataStore(Dispatchers.IO)`.
96+
97+
### Swift
98+
Swift uses `DispatchQueue` for its `BlockingTaskQueue`.
99+
The user-initiated global queue is normally a good choice.
100+
A DataStore from the example above can be created with `DataStore(queue: DispatchQueue.global(qos: .userInitiated)`.
101+
The `DispatchQueue` should be concurrent.
102+
103+
### Python
104+
105+
Python uses a `futures.Executor` for its `BlockingTaskQueue`.
106+
`ThreadPoolExecutor` is typically a good choice.
107+
A DataStore from the example above can be created with `DataStore(ThreadPoolExecutor())`.

fixtures/futures/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ path = "src/bin.rs"
1616

1717
[dependencies]
1818
uniffi = { path = "../../uniffi", version = "0.25", features = ["tokio", "cli"] }
19+
futures = "0.3.29"
1920
thiserror = "1.0"
2021
tokio = { version = "1.24.1", features = ["time", "sync"] }
2122
once_cell = "1.18.0"

fixtures/futures/src/lib.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use std::{
1111
time::Duration,
1212
};
1313

14+
use futures::stream::{FuturesUnordered, StreamExt};
15+
1416
/// Non-blocking timer future.
1517
pub struct TimerFuture {
1618
shared_state: Arc<Mutex<SharedState>>,
@@ -326,4 +328,58 @@ pub async fn use_shared_resource(options: SharedResourceOptions) -> Result<(), A
326328
Ok(())
327329
}
328330

331+
/// Async function that uses a blocking task queue to do its work
332+
#[uniffi::export]
333+
pub async fn calc_square(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 {
334+
queue.execute(|| value * value).await
335+
}
336+
337+
/// Same as before, but this one runs multiple tasks
338+
#[uniffi::export]
339+
pub async fn calc_squares(queue: uniffi::BlockingTaskQueue, items: Vec<i32>) -> Vec<i32> {
340+
// Use `FuturesUnordered` to test our blocking task queue code which is known to be a tricky API to work with.
341+
// In particular, if we don't notify the waker then FuturesUnordered will not poll again.
342+
let mut futures: FuturesUnordered<_> = (0..items.len())
343+
.map(|i| {
344+
// Test that we can use references from the surrounding scope
345+
let items = &items;
346+
queue.execute(move || items[i] * items[i])
347+
})
348+
.collect();
349+
let mut results = vec![];
350+
while let Some(result) = futures.next().await {
351+
results.push(result);
352+
}
353+
results.sort();
354+
results
355+
}
356+
357+
/// ...and this one uses multiple BlockingTaskQueues
358+
#[uniffi::export]
359+
pub async fn calc_squares_multi_queue(
360+
queues: Vec<uniffi::BlockingTaskQueue>,
361+
items: Vec<i32>,
362+
) -> Vec<i32> {
363+
let mut futures: FuturesUnordered<_> = (0..items.len())
364+
.map(|i| {
365+
// Test that we can use references from the surrounding scope
366+
let items = &items;
367+
queues[i].execute(move || items[i] * items[i])
368+
})
369+
.collect();
370+
let mut results = vec![];
371+
while let Some(result) = futures.next().await {
372+
results.push(result);
373+
}
374+
results.sort();
375+
results
376+
}
377+
378+
/// Like calc_square, but it clones the BlockingTaskQueue first then drops both copies. Used to
379+
/// test that a) the clone works and b) we correctly drop the references.
380+
#[uniffi::export]
381+
pub async fn calc_square_with_clone(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 {
382+
queue.clone().execute(|| value * value).await
383+
}
384+
329385
uniffi::include_scaffolding!("futures");

0 commit comments

Comments
 (0)