From bfe9e4499359edc5d5d9307efe26e78aafd50801 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 27 May 2024 19:01:49 +0800 Subject: [PATCH] feat: Add Executor struct and Execute trait (#4648) * feat: Add Executor struct and Execute trait Signed-off-by: Xuanwo * Fix licenses Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- core/Cargo.toml | 111 +++++++++--------- core/src/types/execute/api.rs | 65 ++++++++++ core/src/types/execute/executor.rs | 70 +++++++++++ core/src/types/execute/executors/mod.rs | 27 +++++ .../types/execute/executors/tokio_executor.rs | 60 ++++++++++ core/src/types/execute/mod.rs | 25 ++++ core/src/types/mod.rs | 3 + 7 files changed, 307 insertions(+), 54 deletions(-) create mode 100644 core/src/types/execute/api.rs create mode 100644 core/src/types/execute/executor.rs create mode 100644 core/src/types/execute/executors/mod.rs create mode 100644 core/src/types/execute/executors/tokio_executor.rs create mode 100644 core/src/types/execute/mod.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index 059c48c24eff..73c53e6ac377 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -39,7 +39,7 @@ default-members = ["."] members = [".", "fuzz", "edge/*", "benches/vs_*"] [features] -default = ["reqwest/rustls-tls", "services-memory"] +default = ["reqwest/rustls-tls", "executors-tokio", "services-memory"] # Build test utils or not. # @@ -48,15 +48,15 @@ default = ["reqwest/rustls-tls", "services-memory"] # # You should never enable this feature unless you are developing opendal. tests = [ - "dep:rand", - "dep:sha2", - "dep:dotenvy", - "layers-blocking", - "services-azblob", - "services-fs", - "services-http", - "services-memory", - "services-s3", + "dep:rand", + "dep:sha2", + "dep:dotenvy", + "layers-blocking", + "services-azblob", + "services-fs", + "services-http", + "services-memory", + "services-s3", ] # Enable path cache. @@ -65,6 +65,9 @@ internal-path-cache = ["dep:moka"] # Enable tokio runtime. internal-tokio-rt = ["tokio/rt-multi-thread"] +# Enable tokio executors support. +executors-tokio = ["tokio/rt"] + # Enable layers chaos support layers-chaos = ["dep:rand"] # Enable layers metrics support @@ -94,20 +97,20 @@ layers-dtrace = ["dep:probe"] services-alluxio = [] services-atomicserver = ["dep:atomic_lib"] services-azblob = [ - "dep:sha2", - "dep:reqsign", - "reqsign?/services-azblob", - "reqsign?/reqwest_request", + "dep:sha2", + "dep:reqsign", + "reqsign?/services-azblob", + "reqsign?/reqwest_request", ] services-azdls = [ - "dep:reqsign", - "reqsign?/services-azblob", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-azblob", + "reqsign?/reqwest_request", ] services-azfile = [ - "dep:reqsign", - "reqsign?/services-azblob", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-azblob", + "reqsign?/reqwest_request", ] services-b2 = [] services-cacache = ["dep:cacache"] @@ -115,9 +118,9 @@ services-chainsafe = [] services-cloudflare-kv = [] services-compfs = ["dep:compio"] services-cos = [ - "dep:reqsign", - "reqsign?/services-tencent", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-tencent", + "reqsign?/reqwest_request", ] services-d1 = [] services-dashmap = ["dep:dashmap"] @@ -128,9 +131,9 @@ services-foundationdb = ["dep:foundationdb"] services-fs = ["tokio/fs"] services-ftp = ["dep:suppaftp", "dep:bb8", "dep:async-tls"] services-gcs = [ - "dep:reqsign", - "reqsign?/services-google", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-google", + "reqsign?/reqwest_request", ] services-gdrive = ["internal-path-cache"] services-ghac = [] @@ -152,15 +155,15 @@ services-moka = ["dep:moka"] services-mongodb = ["dep:mongodb"] services-mysql = ["dep:mysql_async"] services-obs = [ - "dep:reqsign", - "reqsign?/services-huaweicloud", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-huaweicloud", + "reqsign?/reqwest_request", ] services-onedrive = [] services-oss = [ - "dep:reqsign", - "reqsign?/services-aliyun", - "reqsign?/reqwest_request", + "dep:reqsign", + "reqsign?/services-aliyun", + "reqsign?/reqwest_request", ] services-pcloud = [] services-persy = ["dep:persy", "internal-tokio-rt"] @@ -170,10 +173,10 @@ services-redis = ["dep:redis", "redis?/tokio-rustls-comp"] services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"] services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"] services-s3 = [ - "dep:reqsign", - "reqsign?/services-aws", - "reqsign?/reqwest_request", - "dep:crc32c", + "dep:reqsign", + "reqsign?/services-aws", + "reqsign?/reqwest_request", + "dep:crc32c", ] services-seafile = [] services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"] @@ -225,8 +228,8 @@ backon = "0.4.3" base64 = "0.22" bytes = "1.6" chrono = { version = "0.4.28", default-features = false, features = [ - "clock", - "std", + "clock", + "std", ] } flagset = "0.4" futures = { version = "0.3", default-features = false, features = ["std"] } @@ -238,7 +241,7 @@ once_cell = "1" percent-encoding = "2" quick-xml = { version = "0.31", features = ["serialize", "overlapped-lists"] } reqwest = { version = "0.12.2", features = [ - "stream", + "stream", ], default-features = false } serde = { version = "1", features = ["derive"] } serde_json = "1" @@ -269,8 +272,8 @@ bb8-postgres = { version = "0.8.1", optional = true } tokio-postgres = { version = "0.7.8", optional = true } # for services-cacache cacache = { version = "13.0", default-features = false, features = [ - "tokio-runtime", - "mmap", + "tokio-runtime", + "mmap", ], optional = true } # for services-dashmap dashmap = { version = "5.4", optional = true } @@ -278,7 +281,7 @@ dashmap = { version = "5.4", optional = true } etcd-client = { version = "0.12", optional = true, features = ["tls"] } # for services-foundationdb foundationdb = { version = "0.8.0", features = [ - "embedded-fdb-include", + "embedded-fdb-include", ], optional = true } # for services-hdfs hdrs = { version = "0.3.2", optional = true, features = ["async_file"] } @@ -294,13 +297,13 @@ moka = { version = "0.12", optional = true, features = ["future", "sync"] } mongodb = { version = "2.8.1", optional = true, features = ["tokio-runtime"] } # for services-mysql mysql_async = { version = "0.32.2", default-features = false, features = [ - "default-rustls", + "default-rustls", ], optional = true } # for services-sftp openssh = { version = "0.10.0", optional = true } openssh-sftp-client = { version = "0.14.0", optional = true, features = [ - "openssh", - "tracing", + "openssh", + "tracing", ] } # for services-persy persy = { version = "1.4.6", optional = true } @@ -308,9 +311,9 @@ persy = { version = "1.4.6", optional = true } redb = { version = "1.1.0", optional = true } # for services-redis redis = { version = "0.23.1", features = [ - "cluster-async", - "tokio-comp", - "connection-manager", + "cluster-async", + "tokio-comp", + "connection-manager", ], optional = true } # for services-rocksdb rocksdb = { version = "0.21.0", default-features = false, optional = true } @@ -320,9 +323,9 @@ rusqlite = { version = "0.31.0", optional = true, features = ["bundled"] } sled = { version = "0.34.7", optional = true } # for services-ftp suppaftp = { version = "5.3.1", default-features = false, features = [ - "async-secure", - "rustls", - "async-rustls", + "async-secure", + "rustls", + "async-rustls", ], optional = true } # for services-tikv tikv-client = { version = "0.3.0", optional = true, default-features = false } @@ -368,7 +371,7 @@ dotenvy = "0.15" libtest-mimic = "0.6" minitrace = { version = "0.6", features = ["enable"] } opentelemetry = { version = "0.21", default-features = false, features = [ - "trace", + "trace", ] } pretty_assertions = "1" rand = "0.8" @@ -376,6 +379,6 @@ sha2 = "0.10" size = "0.4" tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] } tracing-subscriber = { version = "0.3", features = [ - "env-filter", - "tracing-log", + "env-filter", + "tracing-log", ] } diff --git a/core/src/types/execute/api.rs b/core/src/types/execute/api.rs new file mode 100644 index 000000000000..d9c4b926f87d --- /dev/null +++ b/core/src/types/execute/api.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::BoxedStaticFuture; +use crate::*; +use futures::future::RemoteHandle; +use futures::FutureExt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Execute trait is used to execute task in background. +pub trait Execute: 'static { + /// Execute async task in background. + /// + /// # Behavior + /// + /// - Implementor must manage the executing futures and keep making progress. + /// - Implementor must return `Error::Unexpected` if failed to execute new task. + fn execute(&self, f: BoxedStaticFuture<()>) -> Result<()>; +} + +/// Task is generated by Executor that represents an executing task. +/// +/// Users can fetch the results by calling `poll` or `.await` on this task. +/// Or, users can cancel the task by `drop` this task handle. +/// +/// # Notes +/// +/// Users don't need to call `poll` to make progress. All tasks are running in +/// the background. +pub struct Task { + handle: RemoteHandle, +} + +impl Task { + /// Create a new task. + #[inline] + #[allow(unused)] + pub fn new(handle: RemoteHandle) -> Self { + Self { handle } + } +} + +impl Future for Task { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.handle.poll_unpin(cx) + } +} diff --git a/core/src/types/execute/executor.rs b/core/src/types/execute/executor.rs new file mode 100644 index 000000000000..4d1be2994255 --- /dev/null +++ b/core/src/types/execute/executor.rs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::*; +use crate::raw::MaybeSend; +use crate::*; +use futures::FutureExt; +use std::future::Future; +use std::sync::Arc; + +/// Executor that runs futures in background. +/// +/// Executor is created by users and used by opendal. So it's by design that Executor only +/// expose constructor methods. +/// +/// Executor will run futures in background and return a [`Task`] as handle to the future. Users +/// can call `task.await` to wait for the future to complete or drop the `Task` to cancel it. +pub struct Executor { + executor: Arc, +} + +#[cfg(feature = "executors-tokio")] +impl Default for Executor { + fn default() -> Self { + Self::new() + } +} + +impl Executor { + /// Create a default executor. + /// + /// The default executor is enabled by feature flags. + #[cfg(feature = "executors-tokio")] + pub fn new() -> Self { + Self::with(executors::TokioExecutor::default()) + } + + /// Create a new executor with given execute impl. + pub fn with(exec: impl Execute) -> Self { + Self { + executor: Arc::new(exec), + } + } + + /// Run given future in background immediately. + #[allow(unused)] + pub(crate) fn execute(&self, f: F) -> Result> + where + F: Future + MaybeSend + 'static, + F::Output: MaybeSend + 'static, + { + let (fut, handle) = f.remote_handle(); + self.executor.execute(Box::pin(fut))?; + Ok(Task::new(handle)) + } +} diff --git a/core/src/types/execute/executors/mod.rs b/core/src/types/execute/executors/mod.rs new file mode 100644 index 000000000000..d3e79f372339 --- /dev/null +++ b/core/src/types/execute/executors/mod.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! executors module provides implementations for the [`Execute`] trait for widely used runtimes. +//! +//! Every executor will be hide behind the feature like `executors-xxx`. Users can switch or enable +//! the executors they want by enabling the corresponding feature. Also, users can provide their +//! own executor by implementing the [`Execute`] trait directly. + +#[cfg(feature = "executors-tokio")] +mod tokio_executor; +#[cfg(feature = "executors-tokio")] +pub use tokio_executor::TokioExecutor; diff --git a/core/src/types/execute/executors/tokio_executor.rs b/core/src/types/execute/executors/tokio_executor.rs new file mode 100644 index 000000000000..73ecd87ec37e --- /dev/null +++ b/core/src/types/execute/executors/tokio_executor.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::BoxedStaticFuture; +use crate::*; + +/// Executor that uses the [`tokio::task::spawn`] to execute futures. +#[derive(Default)] +pub struct TokioExecutor {} + +impl Execute for TokioExecutor { + /// Tokio's JoinHandle has it's own `abort` support, so dropping handle won't cancel the task. + fn execute(&self, f: BoxedStaticFuture<()>) -> Result<()> { + let _handle = tokio::task::spawn(f); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Executor; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + use std::time::Duration; + use tokio::time::sleep; + + #[tokio::test] + async fn test_tokio_executor() { + let executor = Executor::with(TokioExecutor::default()); + + let finished = Arc::new(AtomicBool::new(false)); + + let finished_clone = finished.clone(); + let _task = executor + .execute(async move { + sleep(Duration::from_secs(1)).await; + finished_clone.store(true, Ordering::Relaxed); + }) + .unwrap(); + + sleep(Duration::from_secs(2)).await; + // Task must has been finished even without await task. + assert!(finished.load(Ordering::Relaxed)) + } +} diff --git a/core/src/types/execute/mod.rs b/core/src/types/execute/mod.rs new file mode 100644 index 000000000000..43f52150b584 --- /dev/null +++ b/core/src/types/execute/mod.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod api; +pub use api::Execute; +pub(crate) use api::Task; + +mod executor; +pub use executor::Executor; + +pub mod executors; diff --git a/core/src/types/mod.rs b/core/src/types/mod.rs index 6749b7967426..7aefe438417f 100644 --- a/core/src/types/mod.rs +++ b/core/src/types/mod.rs @@ -44,6 +44,9 @@ mod list; pub use list::BlockingLister; pub use list::Lister; +mod execute; +pub use execute::*; + mod operator; pub use operator::operator_functions; pub use operator::operator_futures;