-
Notifications
You must be signed in to change notification settings - Fork 517
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* feat: Executor Signed-off-by: Xuanwo <[email protected]> * Update Signed-off-by: Xuanwo <[email protected]> * Fix spelling Signed-off-by: Xuanwo <[email protected]> * Update tracking issues Signed-off-by: Xuanwo <[email protected]> * Update 4638_executor.md Co-authored-by: tison <[email protected]> * Polish code example Signed-off-by: Xuanwo <[email protected]> * Fix typo Signed-off-by: Xuanwo <[email protected]> --------- Signed-off-by: Xuanwo <[email protected]> Co-authored-by: tison <[email protected]>
- Loading branch information
Showing
2 changed files
with
219 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,215 @@ | ||
- Proposal Name: `executor` | ||
- Start Date: 2024-05-23 | ||
- RFC PR: [apache/opendal#4638](https://github.com/apache/opendal/pull/4638) | ||
- Tracking Issue: [apache/opendal#4639](https://github.com/apache/opendal/issues/4639) | ||
|
||
# Summary | ||
|
||
Add executor in opendal to allow running tasks concurrently in background. | ||
|
||
# Motivation | ||
|
||
OpenDAL offers top-tier support for concurrent execution, allowing tasks to run simultaneously in the background. Users can easily enable concurrent file read/write operations with just one line of code: | ||
|
||
```diff | ||
let mut w = op | ||
.writer_with(path) | ||
.chunk(8 * 1024 * 1024) // 8 MiB per chunk | ||
+ .concurrent(16) // 16 concurrent tasks | ||
.await?; | ||
|
||
w.write(bs).await?; | ||
w.write(bs).await?; // The submitted tasks only be executed while user calling `write`. | ||
... | ||
sleep(Duration::from_secs(10)).await; // The submitted tasks make no progress during `sleep`. | ||
... | ||
w.close().await?; | ||
``` | ||
|
||
However, the execution of those tasks relies on users continuously calling `write`. They cannot run tasks concurrently in the background. (I explained the technical details in the `Rationale and alternatives` section.) | ||
|
||
This can result in the following issues: | ||
|
||
- Task latency may increase as tasks are not executed until the task queue is full. | ||
- Memory usage may be high because all chunks must be held in memory until the task is completed. | ||
|
||
I propose introducing an executor abstraction in OpenDAL to enable concurrent background task execution. The executor will automatically manage the tasks in the background without requiring users to drive the progress manually. | ||
|
||
# Guide-level explanation | ||
|
||
OpenDAL will add a new `Executor` struct to manage concurrent tasks. | ||
|
||
```rust | ||
pub struct Executor { | ||
... | ||
} | ||
|
||
pub struct Task { | ||
... | ||
} | ||
|
||
impl Executor { | ||
/// Create a new tokio based executor. | ||
pub fn new() -> Self { ... } | ||
|
||
/// Create a new executor with given execute impl. | ||
pub fn with(exec: Arc<dyn Execute>) -> Self { ... } | ||
|
||
/// Run given future in background immediately. | ||
pub fn execute<F>(&self, f: F) -> Task<F::Output> | ||
where | ||
F: Future + Send + 'static, | ||
{ | ||
... | ||
} | ||
} | ||
``` | ||
|
||
The `Executor` uses the `tokio` runtime by default but users can also provide their own runtime by: | ||
|
||
```rust | ||
pub trait Execute { | ||
fn execute(&self, f: BoxedFuture<()>) -> Result<()>; | ||
} | ||
``` | ||
|
||
Users can set executor in `OpWrite` / `OpRead` to enable concurrent background task execution: | ||
|
||
```rust | ||
+ let exec = Executor::new(); | ||
let w = op | ||
.writer_with(path) | ||
.chunk(8 * 1024 * 1024) // 8 MiB per chunk | ||
.concurrent(16) // 16 concurrent tasks | ||
+ .executor(exec) // Use specified executor | ||
.await?; | ||
``` | ||
|
||
Specifying an executor every time is cumbersome. Users can also set a global executor for given operator: | ||
|
||
```rust | ||
+ let exec = Executor::new(); | ||
+ let op = op.with_default_executor(exec); | ||
|
||
let w = op | ||
.writer_with(path) | ||
.chunk(8 * 1024 * 1024) // 8 MiB per chunk | ||
.concurrent(16) // 16 concurrent tasks | ||
.await?; | ||
``` | ||
|
||
# Reference-level explanation | ||
|
||
As mentioned in the `Guide-level explanation`, the `Executor` struct will manage concurrent tasks in the background. `Executor` will be powered by trait `Execute` to support different underlying runtimes. To make trait `Execute` object safe, we only accept `BoxedFuture<()>` as input. `Executor` will handle the future output and return the result to the caller. | ||
|
||
Operations that supporting concurrent execution will add a new field: | ||
|
||
```rust | ||
pub struct OpXxx { | ||
... | ||
executor: Option<Executor>, | ||
} | ||
``` | ||
|
||
Operator will add a new field to store the default executor: | ||
|
||
```rust | ||
pub struct Operator { | ||
... | ||
default_executor: Option<Executor>, | ||
} | ||
``` | ||
|
||
The `Task` spawned by `Executor` will be a future that can be awaited to fetch the result: | ||
|
||
```rust | ||
let res = task.await; | ||
``` | ||
|
||
The task will be executed immediately after calling `execute`. Users can also cancel the task by dropping the `Task` object. Users don't need to poll those `Task` object to make progress. | ||
|
||
# Drawbacks | ||
|
||
## Complexity | ||
|
||
To support concurrent execution, we need to introduce: | ||
|
||
- a new `Executor` struct | ||
- a new `Task` struct | ||
- a new `Execute` trait | ||
|
||
This may increase the complexity of the codebase. | ||
|
||
# Rationale and alternatives | ||
|
||
## Why introducing so many new abstractions? | ||
|
||
We need to introduce new abstractions to support concurrent execution across different runtimes. Unfortunately, this is the current reality of async rust. | ||
|
||
Supporting just one or two runtimes by adding features is much easier. Supporting only Tokio is extremely simple, requiring about 10 lines of changes. However, this violates our vision of free data access. | ||
|
||
Firstly, we don't want to force our users to use Tokio. We aim to support all runtimes, including async-std, smol, and others. | ||
|
||
Secondly, OpenDAL should be capable of running in any environment, including embedded systems. We don’t want to restrict our users to a specific runtime. | ||
|
||
Finally, users may have their own preferences for observability and performance in their runtime. We intend to accommodate these needs effortlessly. | ||
|
||
## Why `ConcurrentFutures` doesn't work? | ||
|
||
`ConcurrentFutures` is a `Vec<impl Future>`, users need to keep calling `poll_next` to make progress. This is not suitable for our use case. We need a way to run tasks in the background without user intervention. | ||
|
||
> I've heard that futures will wake up when they're ready, and it's the runtime's job to poll them, right? | ||
No, it's partially correct. The runtime will wake up the future when it's ready, but it's the user's job to poll the future. The runtime will not poll the future automatically unless it's managed by the runtime. | ||
|
||
For tokio, that means all futures provided by tokio, like `tokio::time::Sleep`, will be polled by tokio runtime. However, if you create a future by yourself, you need to poll it manually. | ||
|
||
I have an example to explain this: | ||
|
||
*Try it at [playground](https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=628e67adef90128151e175d22c87808e)* | ||
|
||
```rust | ||
use futures::stream::FuturesUnordered; | ||
use futures::StreamExt; | ||
use std::time::Duration; | ||
use tokio::time::{sleep, Instant}; | ||
|
||
#[tokio::main] | ||
async fn main() { | ||
let now = Instant::now(); | ||
let mut cf = FuturesUnordered::new(); | ||
|
||
// cf.push(Box::pin(sleep(Duration::from_secs(3)))); | ||
cf.push(Box::pin(async move { | ||
sleep(Duration::from_secs(3)).await; | ||
println!("async task finished at {}s", now.elapsed().as_secs_f64()); | ||
})); | ||
sleep(Duration::from_secs(4)).await; | ||
println!("outer sleep finished at {}s", now.elapsed().as_secs_f64()); | ||
|
||
let _: Vec<()> = cf.collect().await; | ||
println!("consumed: {}s", now.elapsed().as_secs_f64()) | ||
} | ||
``` | ||
|
||
# Prior art | ||
|
||
None. | ||
|
||
# Unresolved questions | ||
|
||
None. | ||
|
||
# Future possibilities | ||
|
||
## Blocking Executor | ||
|
||
This proposal mainly focuses on async tasks. However, we can also consider adding blocking support to `Executor`. Users can use concurrent tasks in blocking context too: | ||
|
||
```rust | ||
let w = op | ||
.writer_with(path) | ||
.chunk(8 * 1024 * 1024) // 8 MiB per chunk | ||
+ .concurrent(16) // 16 concurrent tasks | ||
.do()?; | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters