Skip to content

Commit

Permalink
Perf: replace async_trait with RPITIT
Browse files Browse the repository at this point in the history
  • Loading branch information
hadronzoo authored and drmingdrmer committed Jan 28, 2024
1 parent 87713ec commit 06b431b
Show file tree
Hide file tree
Showing 35 changed files with 108 additions and 113 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ repository = "https://github.com/datafuselabs/openraft"
anyerror = { version = "0.1.10" }
anyhow = "1.0.63"
async-entry = "0.3.1"
async-trait = "0.1.36"
byte-unit = "4.0.12"
bytes = "1.0"
clap = { version = "4.1.11", features = ["derive", "env"] }
Expand All @@ -27,9 +26,12 @@ futures = "0.3"
lazy_static = "1.4.0"
maplit = "1.0.2"
pretty_assertions = "1.0.0"
proc-macro2 = "1.0"
quote = "1.0"
rand = "0.8"
serde = { version="1.0.114", features=["derive", "rc"]}
serde_json = "1.0.57"
syn = "2.0"
tempfile = { version = "3.4.0" }
thiserror = "1.0.49"
tokio = { version="1.8", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] }
Expand Down
3 changes: 0 additions & 3 deletions cluster_benchmark/tests/benchmark/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

use openraft::async_trait::async_trait;
use openraft::error::InstallSnapshotError;
use openraft::error::RPCError;
use openraft::error::RaftError;
Expand Down Expand Up @@ -80,7 +79,6 @@ impl Router {
}
}

#[async_trait]
impl RaftNetworkFactory<MemConfig> for Router {
type Network = Network;

Expand All @@ -97,7 +95,6 @@ pub struct Network {
target_raft: BenchRaft,
}

#[async_trait]
impl RaftNetwork<MemConfig> for Network {
async fn append_entries(
&mut self,
Expand Down
5 changes: 0 additions & 5 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use openraft::async_trait::async_trait;
use openraft::storage::LogFlushed;
use openraft::storage::LogState;
use openraft::storage::RaftLogReader;
Expand Down Expand Up @@ -100,7 +99,6 @@ impl StateMachineStore {
}
}

#[async_trait]
impl RaftLogReader<TypeConfig> for Arc<LogStore> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&mut self,
Expand All @@ -118,7 +116,6 @@ impl RaftLogReader<TypeConfig> for Arc<LogStore> {
}
}

#[async_trait]
impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
Expand Down Expand Up @@ -170,7 +167,6 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
}
}

#[async_trait]
impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
async fn get_log_state(&mut self) -> Result<LogState<TypeConfig>, StorageError<NodeId>> {
let log = self.log.read().await;
Expand Down Expand Up @@ -244,7 +240,6 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
}
}

#[async_trait]
impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
async fn applied_state(
&mut self,
Expand Down
3 changes: 1 addition & 2 deletions cluster_benchmark/tests/benchmark/store/test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use openraft::async_trait::async_trait;
use openraft::testing::StoreBuilder;
use openraft::testing::Suite;
use openraft::StorageError;
Expand All @@ -11,7 +10,7 @@ use crate::store::StateMachineStore;
use crate::store::TypeConfig;

struct Builder {}
#[async_trait]

impl StoreBuilder<TypeConfig, Arc<LogStore>, Arc<StateMachineStore>> for Builder {
async fn build(&self) -> Result<((), Arc<LogStore>, Arc<StateMachineStore>), StorageError<NodeId>> {
let log_store = LogStore::new_async().await;
Expand Down
3 changes: 0 additions & 3 deletions examples/raft-kv-memstore-singlethreaded/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use openraft::add_async_trait;
use openraft::error::InstallSnapshotError;
use openraft::error::RemoteError;
use openraft::raft::AppendEntriesRequest;
Expand All @@ -21,7 +20,6 @@ pub struct Connection {
target: NodeId,
}

#[add_async_trait]
impl RaftNetworkFactory<TypeConfig> for Router {
type Network = Connection;

Expand All @@ -33,7 +31,6 @@ impl RaftNetworkFactory<TypeConfig> for Router {
}
}

#[add_async_trait]
impl RaftNetwork<TypeConfig> for Connection {
async fn send_append_entries(
&mut self,
Expand Down
5 changes: 0 additions & 5 deletions examples/raft-kv-memstore-singlethreaded/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::marker::PhantomData;
use std::ops::RangeBounds;
use std::rc::Rc;

use openraft::add_async_trait;
use openraft::storage::LogFlushed;
use openraft::storage::LogState;
use openraft::storage::RaftLogStorage;
Expand Down Expand Up @@ -120,7 +119,6 @@ pub struct LogStore {
vote: RefCell<Option<Vote<NodeId>>>,
}

#[add_async_trait]
impl RaftLogReader<TypeConfig> for Rc<LogStore> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug>(
&mut self,
Expand All @@ -132,7 +130,6 @@ impl RaftLogReader<TypeConfig> for Rc<LogStore> {
}
}

#[add_async_trait]
impl RaftSnapshotBuilder<TypeConfig> for Rc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
Expand Down Expand Up @@ -184,7 +181,6 @@ impl RaftSnapshotBuilder<TypeConfig> for Rc<StateMachineStore> {
}
}

#[add_async_trait]
impl RaftStateMachine<TypeConfig> for Rc<StateMachineStore> {
type SnapshotBuilder = Self;

Expand Down Expand Up @@ -282,7 +278,6 @@ impl RaftStateMachine<TypeConfig> for Rc<StateMachineStore> {
}
}

#[add_async_trait]
impl RaftLogStorage<TypeConfig> for Rc<LogStore> {
type LogReader = Self;

Expand Down
3 changes: 0 additions & 3 deletions examples/raft-kv-memstore/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_trait::async_trait;
use openraft::error::InstallSnapshotError;
use openraft::error::NetworkError;
use openraft::error::RemoteError;
Expand Down Expand Up @@ -59,7 +58,6 @@ impl Network {

// NOTE: This could be implemented also on `Arc<ExampleNetwork>`, but since it's empty, implemented
// directly.
#[async_trait]
impl RaftNetworkFactory<TypeConfig> for Network {
type Network = NetworkConnection;

Expand All @@ -78,7 +76,6 @@ pub struct NetworkConnection {
target_node: BasicNode,
}

#[async_trait]
impl RaftNetwork<TypeConfig> for NetworkConnection {
async fn send_append_entries(
&mut self,
Expand Down
5 changes: 0 additions & 5 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::ops::RangeBounds;
use std::sync::Arc;
use std::sync::Mutex;

use openraft::async_trait::async_trait;
use openraft::storage::LogFlushed;
use openraft::storage::LogState;
use openraft::storage::RaftLogStorage;
Expand Down Expand Up @@ -101,7 +100,6 @@ pub struct LogStore {
vote: RwLock<Option<Vote<NodeId>>>,
}

#[async_trait]
impl RaftLogReader<TypeConfig> for Arc<LogStore> {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&mut self,
Expand All @@ -113,7 +111,6 @@ impl RaftLogReader<TypeConfig> for Arc<LogStore> {
}
}

#[async_trait]
impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
#[tracing::instrument(level = "trace", skip(self))]
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
Expand Down Expand Up @@ -165,7 +162,6 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {
}
}

#[async_trait]
impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
type SnapshotBuilder = Self;

Expand Down Expand Up @@ -263,7 +259,6 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
}
}

#[async_trait]
impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
type LogReader = Self;

Expand Down
3 changes: 0 additions & 3 deletions examples/raft-kv-rocksdb/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::any::Any;
use std::fmt::Display;

use async_trait::async_trait;
use openraft::error::InstallSnapshotError;
use openraft::error::NetworkError;
use openraft::error::RPCError;
Expand Down Expand Up @@ -29,7 +28,6 @@ pub struct Network {}

// NOTE: This could be implemented also on `Arc<ExampleNetwork>`, but since it's empty, implemented
// directly.
#[async_trait]
impl RaftNetworkFactory<TypeConfig> for Network {
type Network = NetworkConnection;

Expand Down Expand Up @@ -112,7 +110,6 @@ fn to_error<E: std::error::Error + 'static + Clone>(e: toy_rpc::Error, target: N
// = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#blocks_in_conditions
// = note: `#[warn(clippy::blocks_in_conditions)]` on by default
#[allow(clippy::blocks_in_conditions)]
#[async_trait]
impl RaftNetwork<TypeConfig> for NetworkConnection {
#[tracing::instrument(level = "debug", skip_all, err(Debug))]
async fn send_append_entries(
Expand Down
5 changes: 0 additions & 5 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use async_std::sync::RwLock;
use byteorder::BigEndian;
use byteorder::ReadBytesExt;
use byteorder::WriteBytesExt;
use openraft::async_trait::async_trait;
use openraft::storage::LogFlushed;
use openraft::storage::LogState;
use openraft::storage::RaftLogStorage;
Expand Down Expand Up @@ -99,7 +98,6 @@ pub struct StateMachineData {
pub kvs: Arc<RwLock<BTreeMap<String, String>>>,
}

#[async_trait]
impl RaftSnapshotBuilder<TypeConfig> for StateMachineStore {
async fn build_snapshot(&mut self) -> Result<Snapshot<TypeConfig>, StorageError<NodeId>> {
let last_applied_log = self.data.last_applied_log_id;
Expand Down Expand Up @@ -198,7 +196,6 @@ impl StateMachineStore {
}
}

#[async_trait]
impl RaftStateMachine<TypeConfig> for StateMachineStore {
type SnapshotBuilder = Self;

Expand Down Expand Up @@ -370,7 +367,6 @@ impl LogStore {
}
}

#[async_trait]
impl RaftLogReader<TypeConfig> for LogStore {
async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&mut self,
Expand Down Expand Up @@ -399,7 +395,6 @@ impl RaftLogReader<TypeConfig> for LogStore {
}
}

#[async_trait]
impl RaftLogStorage<TypeConfig> for LogStore {
type LogReader = Self;

Expand Down
7 changes: 6 additions & 1 deletion macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ repository = { workspace = true }
[lib]
proc-macro = true

[dependencies]
proc-macro2 = { workspace = true }
quote = { workspace = true }
syn = { workspace = true, features = ["full"] }

[features]

# Do not use `async_trait` and do not add `Send` bounds.
# Do not add `Send` bounds.
singlethreaded = []
Loading

0 comments on commit 06b431b

Please sign in to comment.