Skip to content

Commit

Permalink
feat: Add Executor struct and Execute trait (#4648)
Browse files Browse the repository at this point in the history
* feat: Add Executor struct and Execute trait

Signed-off-by: Xuanwo <[email protected]>

* Fix licenses

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored May 27, 2024
1 parent b248bd5 commit bfe9e44
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 54 deletions.
111 changes: 57 additions & 54 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ default-members = ["."]
members = [".", "fuzz", "edge/*", "benches/vs_*"]

[features]
default = ["reqwest/rustls-tls", "services-memory"]
default = ["reqwest/rustls-tls", "executors-tokio", "services-memory"]

# Build test utils or not.
#
Expand All @@ -48,15 +48,15 @@ default = ["reqwest/rustls-tls", "services-memory"]
#
# You should never enable this feature unless you are developing opendal.
tests = [
"dep:rand",
"dep:sha2",
"dep:dotenvy",
"layers-blocking",
"services-azblob",
"services-fs",
"services-http",
"services-memory",
"services-s3",
"dep:rand",
"dep:sha2",
"dep:dotenvy",
"layers-blocking",
"services-azblob",
"services-fs",
"services-http",
"services-memory",
"services-s3",
]

# Enable path cache.
Expand All @@ -65,6 +65,9 @@ internal-path-cache = ["dep:moka"]
# Enable tokio runtime.
internal-tokio-rt = ["tokio/rt-multi-thread"]

# Enable tokio executors support.
executors-tokio = ["tokio/rt"]

# Enable layers chaos support
layers-chaos = ["dep:rand"]
# Enable layers metrics support
Expand Down Expand Up @@ -94,30 +97,30 @@ layers-dtrace = ["dep:probe"]
services-alluxio = []
services-atomicserver = ["dep:atomic_lib"]
services-azblob = [
"dep:sha2",
"dep:reqsign",
"reqsign?/services-azblob",
"reqsign?/reqwest_request",
"dep:sha2",
"dep:reqsign",
"reqsign?/services-azblob",
"reqsign?/reqwest_request",
]
services-azdls = [
"dep:reqsign",
"reqsign?/services-azblob",
"reqsign?/reqwest_request",
"dep:reqsign",
"reqsign?/services-azblob",
"reqsign?/reqwest_request",
]
services-azfile = [
"dep:reqsign",
"reqsign?/services-azblob",
"reqsign?/reqwest_request",
"dep:reqsign",
"reqsign?/services-azblob",
"reqsign?/reqwest_request",
]
services-b2 = []
services-cacache = ["dep:cacache"]
services-chainsafe = []
services-cloudflare-kv = []
services-compfs = ["dep:compio"]
services-cos = [
"dep:reqsign",
"reqsign?/services-tencent",
"reqsign?/reqwest_request",
"dep:reqsign",
"reqsign?/services-tencent",
"reqsign?/reqwest_request",
]
services-d1 = []
services-dashmap = ["dep:dashmap"]
Expand All @@ -128,9 +131,9 @@ services-foundationdb = ["dep:foundationdb"]
services-fs = ["tokio/fs"]
services-ftp = ["dep:suppaftp", "dep:bb8", "dep:async-tls"]
services-gcs = [
"dep:reqsign",
"reqsign?/services-google",
"reqsign?/reqwest_request",
"dep:reqsign",
"reqsign?/services-google",
"reqsign?/reqwest_request",
]
services-gdrive = ["internal-path-cache"]
services-ghac = []
Expand All @@ -152,15 +155,15 @@ services-moka = ["dep:moka"]
services-mongodb = ["dep:mongodb"]
services-mysql = ["dep:mysql_async"]
services-obs = [
"dep:reqsign",
"reqsign?/services-huaweicloud",
"reqsign?/reqwest_request",
"dep:reqsign",
"reqsign?/services-huaweicloud",
"reqsign?/reqwest_request",
]
services-onedrive = []
services-oss = [
"dep:reqsign",
"reqsign?/services-aliyun",
"reqsign?/reqwest_request",
"dep:reqsign",
"reqsign?/services-aliyun",
"reqsign?/reqwest_request",
]
services-pcloud = []
services-persy = ["dep:persy", "internal-tokio-rt"]
Expand All @@ -170,10 +173,10 @@ services-redis = ["dep:redis", "redis?/tokio-rustls-comp"]
services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"]
services-s3 = [
"dep:reqsign",
"reqsign?/services-aws",
"reqsign?/reqwest_request",
"dep:crc32c",
"dep:reqsign",
"reqsign?/services-aws",
"reqsign?/reqwest_request",
"dep:crc32c",
]
services-seafile = []
services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"]
Expand Down Expand Up @@ -225,8 +228,8 @@ backon = "0.4.3"
base64 = "0.22"
bytes = "1.6"
chrono = { version = "0.4.28", default-features = false, features = [
"clock",
"std",
"clock",
"std",
] }
flagset = "0.4"
futures = { version = "0.3", default-features = false, features = ["std"] }
Expand All @@ -238,7 +241,7 @@ once_cell = "1"
percent-encoding = "2"
quick-xml = { version = "0.31", features = ["serialize", "overlapped-lists"] }
reqwest = { version = "0.12.2", features = [
"stream",
"stream",
], default-features = false }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand Down Expand Up @@ -269,16 +272,16 @@ bb8-postgres = { version = "0.8.1", optional = true }
tokio-postgres = { version = "0.7.8", optional = true }
# for services-cacache
cacache = { version = "13.0", default-features = false, features = [
"tokio-runtime",
"mmap",
"tokio-runtime",
"mmap",
], optional = true }
# for services-dashmap
dashmap = { version = "5.4", optional = true }
# for services-etcd
etcd-client = { version = "0.12", optional = true, features = ["tls"] }
# for services-foundationdb
foundationdb = { version = "0.8.0", features = [
"embedded-fdb-include",
"embedded-fdb-include",
], optional = true }
# for services-hdfs
hdrs = { version = "0.3.2", optional = true, features = ["async_file"] }
Expand All @@ -294,23 +297,23 @@ moka = { version = "0.12", optional = true, features = ["future", "sync"] }
mongodb = { version = "2.8.1", optional = true, features = ["tokio-runtime"] }
# for services-mysql
mysql_async = { version = "0.32.2", default-features = false, features = [
"default-rustls",
"default-rustls",
], optional = true }
# for services-sftp
openssh = { version = "0.10.0", optional = true }
openssh-sftp-client = { version = "0.14.0", optional = true, features = [
"openssh",
"tracing",
"openssh",
"tracing",
] }
# for services-persy
persy = { version = "1.4.6", optional = true }
# for services-redb
redb = { version = "1.1.0", optional = true }
# for services-redis
redis = { version = "0.23.1", features = [
"cluster-async",
"tokio-comp",
"connection-manager",
"cluster-async",
"tokio-comp",
"connection-manager",
], optional = true }
# for services-rocksdb
rocksdb = { version = "0.21.0", default-features = false, optional = true }
Expand All @@ -320,9 +323,9 @@ rusqlite = { version = "0.31.0", optional = true, features = ["bundled"] }
sled = { version = "0.34.7", optional = true }
# for services-ftp
suppaftp = { version = "5.3.1", default-features = false, features = [
"async-secure",
"rustls",
"async-rustls",
"async-secure",
"rustls",
"async-rustls",
], optional = true }
# for services-tikv
tikv-client = { version = "0.3.0", optional = true, default-features = false }
Expand Down Expand Up @@ -368,14 +371,14 @@ dotenvy = "0.15"
libtest-mimic = "0.6"
minitrace = { version = "0.6", features = ["enable"] }
opentelemetry = { version = "0.21", default-features = false, features = [
"trace",
"trace",
] }
pretty_assertions = "1"
rand = "0.8"
sha2 = "0.10"
size = "0.4"
tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] }
tracing-subscriber = { version = "0.3", features = [
"env-filter",
"tracing-log",
"env-filter",
"tracing-log",
] }
65 changes: 65 additions & 0 deletions core/src/types/execute/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::raw::BoxedStaticFuture;
use crate::*;
use futures::future::RemoteHandle;
use futures::FutureExt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Execute trait is used to execute task in background.
pub trait Execute: 'static {
/// Execute async task in background.
///
/// # Behavior
///
/// - Implementor must manage the executing futures and keep making progress.
/// - Implementor must return `Error::Unexpected` if failed to execute new task.
fn execute(&self, f: BoxedStaticFuture<()>) -> Result<()>;
}

/// Task is generated by Executor that represents an executing task.
///
/// Users can fetch the results by calling `poll` or `.await` on this task.
/// Or, users can cancel the task by `drop` this task handle.
///
/// # Notes
///
/// Users don't need to call `poll` to make progress. All tasks are running in
/// the background.
pub struct Task<T> {
handle: RemoteHandle<T>,
}

impl<T: 'static> Task<T> {
/// Create a new task.
#[inline]
#[allow(unused)]
pub fn new(handle: RemoteHandle<T>) -> Self {
Self { handle }
}
}

impl<T: 'static> Future for Task<T> {
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.handle.poll_unpin(cx)
}
}
70 changes: 70 additions & 0 deletions core/src/types/execute/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use super::*;
use crate::raw::MaybeSend;
use crate::*;
use futures::FutureExt;
use std::future::Future;
use std::sync::Arc;

/// Executor that runs futures in background.
///
/// Executor is created by users and used by opendal. So it's by design that Executor only
/// expose constructor methods.
///
/// Executor will run futures in background and return a [`Task`] as handle to the future. Users
/// can call `task.await` to wait for the future to complete or drop the `Task` to cancel it.
pub struct Executor {
executor: Arc<dyn Execute>,
}

#[cfg(feature = "executors-tokio")]
impl Default for Executor {
fn default() -> Self {
Self::new()
}
}

impl Executor {
/// Create a default executor.
///
/// The default executor is enabled by feature flags.
#[cfg(feature = "executors-tokio")]
pub fn new() -> Self {
Self::with(executors::TokioExecutor::default())
}

/// Create a new executor with given execute impl.
pub fn with(exec: impl Execute) -> Self {
Self {
executor: Arc::new(exec),
}
}

/// Run given future in background immediately.
#[allow(unused)]
pub(crate) fn execute<F>(&self, f: F) -> Result<Task<F::Output>>
where
F: Future + MaybeSend + 'static,
F::Output: MaybeSend + 'static,
{
let (fut, handle) = f.remote_handle();
self.executor.execute(Box::pin(fut))?;
Ok(Task::new(handle))
}
}
Loading

0 comments on commit bfe9e44

Please sign in to comment.