From abdc33c3b4fb04cd2099aaa34043ac5420b3d065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?19=E5=B9=B4=E6=A2=A6=E9=86=92?= <3949379+getong@users.noreply.github.com> Date: Mon, 29 Jan 2024 04:21:50 +0800 Subject: [PATCH] Refactor: delete async-trait, fix typo, use tokio to replace async_std --- .../Cargo.toml | 1 - examples/raft-kv-memstore/Cargo.toml | 1 - examples/raft-kv-rocksdb/Cargo.toml | 10 +++++++--- examples/raft-kv-rocksdb/src/app.rs | 4 ++-- examples/raft-kv-rocksdb/src/bin/main.rs | 2 +- examples/raft-kv-rocksdb/src/lib.rs | 12 ++++++------ .../raft-kv-rocksdb/src/network/management.rs | 2 +- examples/raft-kv-rocksdb/src/store.rs | 2 +- .../tests/cluster/test_cluster.rs | 19 +++++++++++-------- 9 files changed, 29 insertions(+), 24 deletions(-) diff --git a/examples/raft-kv-memstore-singlethreaded/Cargo.toml b/examples/raft-kv-memstore-singlethreaded/Cargo.toml index f75cd810a..f1ce56fa4 100644 --- a/examples/raft-kv-memstore-singlethreaded/Cargo.toml +++ b/examples/raft-kv-memstore-singlethreaded/Cargo.toml @@ -18,7 +18,6 @@ repository = "https://github.com/datafuselabs/openraft" [dependencies] openraft = { path = "../../openraft", features = ["serde", "storage-v2", "singlethreaded"] } -async-trait = "0.1.36" clap = { version = "4.1.11", features = ["derive", "env"] } reqwest = { version = "0.11.9", features = ["json"] } serde = { version = "1.0.114", features = ["derive"] } diff --git a/examples/raft-kv-memstore/Cargo.toml b/examples/raft-kv-memstore/Cargo.toml index 4bb1a8d55..55958d221 100644 --- a/examples/raft-kv-memstore/Cargo.toml +++ b/examples/raft-kv-memstore/Cargo.toml @@ -23,7 +23,6 @@ path = "src/bin/main.rs" openraft = { path = "../../openraft", features = ["serde", "storage-v2"] } actix-web = "4.0.0-rc.2" -async-trait = "0.1.36" clap = { version = "4.1.11", features = ["derive", "env"] } reqwest = { version = "0.11.9", features = ["json"] } serde = { version = "1.0.114", features = ["derive"] } diff --git a/examples/raft-kv-rocksdb/Cargo.toml b/examples/raft-kv-rocksdb/Cargo.toml index d3d1178b8..5ec801317 100644 --- a/examples/raft-kv-rocksdb/Cargo.toml +++ b/examples/raft-kv-rocksdb/Cargo.toml @@ -23,8 +23,7 @@ path = "src/bin/main.rs" [dependencies] openraft = { path = "../../openraft", features = ["serde", "storage-v2"] } -async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } -async-trait = "0.1.36" +tokio = { version = "1.35.1", features = ["full"] } byteorder = "1.4.3" clap = { version = "4.1.11", features = ["derive", "env"] } reqwest = { version = "0.11.9", features = ["json"] } @@ -34,7 +33,12 @@ serde_json = "1.0.57" tide = { version = "0.16" } # for toy-rpc, use `serde_json` instead of the default `serde_bincode`: # bincode which enabled by default by toy-rpc, does not support `#[serde(flatten)]`: https://docs.rs/bincode/2.0.0-alpha.1/bincode/serde/index.html#known-issues -toy-rpc = { version = "0.8.6", default-features = false, features = [ "serde_json", "ws_async_std", "server", "client", "async_std_runtime", ] } +toy-rpc = { version = "0.8.6", features = [ + "ws_tokio", + "server", + "client", + "tokio_runtime", +] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } diff --git a/examples/raft-kv-rocksdb/src/app.rs b/examples/raft-kv-rocksdb/src/app.rs index 84cafd764..227ac0f7f 100644 --- a/examples/raft-kv-rocksdb/src/app.rs +++ b/examples/raft-kv-rocksdb/src/app.rs @@ -1,8 +1,8 @@ use std::collections::BTreeMap; use std::sync::Arc; -use async_std::sync::RwLock; use openraft::Config; +use tokio::sync::RwLock; use crate::ExampleRaft; use crate::NodeId; @@ -12,7 +12,7 @@ use crate::NodeId; pub struct App { pub id: NodeId, pub api_addr: String, - pub rcp_addr: String, + pub rpc_addr: String, pub raft: ExampleRaft, pub key_values: Arc>>, pub config: Arc, diff --git a/examples/raft-kv-rocksdb/src/bin/main.rs b/examples/raft-kv-rocksdb/src/bin/main.rs index aad600bda..117603260 100644 --- a/examples/raft-kv-rocksdb/src/bin/main.rs +++ b/examples/raft-kv-rocksdb/src/bin/main.rs @@ -15,7 +15,7 @@ pub struct Opt { pub rpc_addr: String, } -#[async_std::main] +#[tokio::main] async fn main() -> std::io::Result<()> { // Setup the logger tracing_subscriber::fmt() diff --git a/examples/raft-kv-rocksdb/src/lib.rs b/examples/raft-kv-rocksdb/src/lib.rs index 72349219f..de2fb90b7 100644 --- a/examples/raft-kv-rocksdb/src/lib.rs +++ b/examples/raft-kv-rocksdb/src/lib.rs @@ -6,10 +6,10 @@ use std::io::Cursor; use std::path::Path; use std::sync::Arc; -use async_std::net::TcpListener; -use async_std::task; use openraft::Config; use openraft::TokioRuntime; +use tokio::net::TcpListener; +use tokio::task; use crate::app::App; use crate::network::api; @@ -79,7 +79,7 @@ pub async fn start_example_raft_node

( node_id: NodeId, dir: P, http_addr: String, - rcp_addr: String, + rpc_addr: String, ) -> std::io::Result<()> where P: AsRef, @@ -107,7 +107,7 @@ where let app = Arc::new(App { id: node_id, api_addr: http_addr.clone(), - rcp_addr: rcp_addr.clone(), + rpc_addr: rpc_addr.clone(), raft, key_values: kvs, config, @@ -117,7 +117,7 @@ where let server = toy_rpc::Server::builder().register(echo_service).build(); - let listener = TcpListener::bind(rcp_addr).await.unwrap(); + let listener = TcpListener::bind(rpc_addr).await.unwrap(); let handle = task::spawn(async move { server.accept_websocket(listener).await.unwrap(); }); @@ -130,6 +130,6 @@ where api::rest(&mut app); app.listen(http_addr).await?; - handle.await; + _ = handle.await; Ok(()) } diff --git a/examples/raft-kv-rocksdb/src/network/management.rs b/examples/raft-kv-rocksdb/src/network/management.rs index 798d8189a..20ba75de8 100644 --- a/examples/raft-kv-rocksdb/src/network/management.rs +++ b/examples/raft-kv-rocksdb/src/network/management.rs @@ -48,7 +48,7 @@ async fn init(req: Request>) -> tide::Result { let mut nodes = BTreeMap::new(); let node = Node { api_addr: req.state().api_addr.clone(), - rpc_addr: req.state().rcp_addr.clone(), + rpc_addr: req.state().rpc_addr.clone(), }; nodes.insert(req.state().id, node); diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index 2e7c7f421..506f9002d 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -5,7 +5,6 @@ use std::ops::RangeBounds; use std::path::Path; use std::sync::Arc; -use async_std::sync::RwLock; use byteorder::BigEndian; use byteorder::ReadBytesExt; use byteorder::WriteBytesExt; @@ -35,6 +34,7 @@ use rocksdb::Options; use rocksdb::DB; use serde::Deserialize; use serde::Serialize; +use tokio::sync::RwLock; use crate::typ; use crate::Node; diff --git a/examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs b/examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs index 0ee552ba2..ed37360e5 100644 --- a/examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs +++ b/examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs @@ -4,13 +4,13 @@ use std::panic::PanicInfo; use std::thread; use std::time::Duration; -use async_std::task::block_on; use maplit::btreemap; use maplit::btreeset; use raft_kv_rocksdb::client::ExampleClient; use raft_kv_rocksdb::start_example_raft_node; use raft_kv_rocksdb::store::Request; use raft_kv_rocksdb::Node; +use tokio::runtime::Handle; use tracing_subscriber::EnvFilter; pub fn log_panic(panic: &PanicInfo) { @@ -36,7 +36,7 @@ pub fn log_panic(panic: &PanicInfo) { /// Setup a cluster of 3 nodes. /// Write to it and read from it. -#[async_std::test(flavor = "multi_thread", worker_threads = 8)] +#[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn test_cluster() -> Result<(), Box> { // --- The client itself does not store addresses for all nodes, but just node id. // Thus we need a supporting component to provide mapping from node id to node address. @@ -77,23 +77,26 @@ async fn test_cluster() -> Result<(), Box> { let d2 = tempfile::TempDir::new()?; let d3 = tempfile::TempDir::new()?; + let handle = Handle::current(); + let handle_clone = handle.clone(); let _h1 = thread::spawn(move || { - let x = block_on(start_example_raft_node(1, d1.path(), get_addr(1), get_rpc_addr(1))); + let x = handle_clone.block_on(start_example_raft_node(1, d1.path(), get_addr(1), get_rpc_addr(1))); println!("x: {:?}", x); }); + let handle_clone = handle.clone(); let _h2 = thread::spawn(move || { - let x = block_on(start_example_raft_node(2, d2.path(), get_addr(2), get_rpc_addr(2))); + let x = handle_clone.block_on(start_example_raft_node(2, d2.path(), get_addr(2), get_rpc_addr(2))); println!("x: {:?}", x); }); let _h3 = thread::spawn(move || { - let x = block_on(start_example_raft_node(3, d3.path(), get_addr(3), get_rpc_addr(3))); + let x = handle.block_on(start_example_raft_node(3, d3.path(), get_addr(3), get_rpc_addr(3))); println!("x: {:?}", x); }); // Wait for server to start up. - async_std::task::sleep(Duration::from_millis(1_000)).await; + tokio::time::sleep(Duration::from_millis(1_000)).await; // --- Create a client to the first node, as a control handle to the cluster. @@ -172,7 +175,7 @@ async fn test_cluster() -> Result<(), Box> { // --- Wait for a while to let the replication get done. - async_std::task::sleep(Duration::from_millis(500)).await; + tokio::time::sleep(Duration::from_millis(500)).await; // --- Read it on every node. @@ -200,7 +203,7 @@ async fn test_cluster() -> Result<(), Box> { }) .await?; - async_std::task::sleep(Duration::from_millis(500)).await; + tokio::time::sleep(Duration::from_millis(500)).await; // --- Read it on every node.